remove deactivated_users call
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
29
30 import Ecto.Query
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
33
34 require Logger
35 require Pleroma.Constants
36
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
54 {recipients, to, cc}
55 end
56
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
63 _ -> false
64 end
65 end
66
67 defp check_actor_can_insert(_), do: true
68
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
72 end
73
74 defp check_remote_limit(_), do: true
75
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78 end
79
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82 end
83
84 defp increase_replies_count_if_reply(%{
85 "object" => %{"inReplyTo" => reply_ap_id} = object,
86 "type" => "Create"
87 }) do
88 if is_public?(object) do
89 Object.increase_replies_count(reply_ap_id)
90 end
91 end
92
93 defp increase_replies_count_if_reply(_create_data), do: :noop
94
95 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
96 @impl true
97 def persist(%{"type" => type} = object, meta) when type in @object_types do
98 with {:ok, object} <- Object.create(object) do
99 {:ok, object, meta}
100 end
101 end
102
103 @impl true
104 def persist(object, meta) do
105 with local <- Keyword.fetch!(meta, :local),
106 {recipients, _, _} <- get_recipients(object),
107 {:ok, activity} <-
108 Repo.insert(%Activity{
109 data: object,
110 local: local,
111 recipients: recipients,
112 actor: object["actor"]
113 }),
114 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
115 {:ok, _} <- maybe_create_activity_expiration(activity) do
116 {:ok, activity, meta}
117 end
118 end
119
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
125 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map),
131 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
132 # Splice in the child object if we have one.
133 activity = Maps.put_if_present(activity, :object, object)
134
135 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
136 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
137 end)
138
139 {:ok, activity}
140 else
141 %Activity{} = activity ->
142 {:ok, activity}
143
144 {:actor_check, _} ->
145 {:error, false}
146
147 {:containment, _} = error ->
148 error
149
150 {:error, _} = error ->
151 error
152
153 {:fake, true, map, recipients} ->
154 activity = %Activity{
155 data: map,
156 local: local,
157 actor: map["actor"],
158 recipients: recipients,
159 id: "pleroma:fakeid"
160 }
161
162 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
163 {:ok, activity}
164
165 {:remote_limit_pass, _} ->
166 {:error, :remote_limit}
167
168 {:reject, _} = e ->
169 {:error, e}
170 end
171 end
172
173 defp insert_activity_with_expiration(data, local, recipients) do
174 struct = %Activity{
175 data: data,
176 local: local,
177 actor: data["actor"],
178 recipients: recipients
179 }
180
181 with {:ok, activity} <- Repo.insert(struct) do
182 maybe_create_activity_expiration(activity)
183 end
184 end
185
186 def notify_and_stream(activity) do
187 Notification.create_notifications(activity)
188
189 conversation = create_or_bump_conversation(activity, activity.actor)
190 participations = get_participations(conversation)
191 stream_out(activity)
192 stream_out_participations(participations)
193 end
194
195 defp maybe_create_activity_expiration(
196 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
197 ) do
198 with {:ok, _job} <-
199 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
200 activity_id: activity.id,
201 expires_at: expires_at
202 }) do
203 {:ok, activity}
204 end
205 end
206
207 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
208
209 defp create_or_bump_conversation(activity, actor) do
210 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
211 %User{} = user <- User.get_cached_by_ap_id(actor) do
212 Participation.mark_as_read(user, conversation)
213 {:ok, conversation}
214 end
215 end
216
217 defp get_participations({:ok, conversation}) do
218 conversation
219 |> Repo.preload(:participations, force: true)
220 |> Map.get(:participations)
221 end
222
223 defp get_participations(_), do: []
224
225 def stream_out_participations(participations) do
226 participations =
227 participations
228 |> Repo.preload(:user)
229
230 Streamer.stream("participation", participations)
231 end
232
233 @impl true
234 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
235 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
236 conversation = Repo.preload(conversation, :participations)
237
238 last_activity_id =
239 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
240 user: user,
241 blocking_user: user
242 })
243
244 if last_activity_id do
245 stream_out_participations(conversation.participations)
246 end
247 end
248 end
249
250 @impl true
251 def stream_out_participations(_, _), do: :noop
252
253 @impl true
254 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
255 when data_type in ["Create", "Announce", "Delete"] do
256 activity
257 |> Topics.get_activity_topics()
258 |> Streamer.stream(activity)
259 end
260
261 @impl true
262 def stream_out(_activity) do
263 :noop
264 end
265
266 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
267 def create(params, fake \\ false) do
268 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
269 result
270 end
271 end
272
273 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
274 additional = params[:additional] || %{}
275 # only accept false as false value
276 local = !(params[:local] == false)
277 published = params[:published]
278 quick_insert? = Config.get([:env]) == :benchmark
279
280 create_data =
281 make_create_data(
282 %{to: to, actor: actor, published: published, context: context, object: object},
283 additional
284 )
285
286 with {:ok, activity} <- insert(create_data, local, fake),
287 {:fake, false, activity} <- {:fake, fake, activity},
288 _ <- increase_replies_count_if_reply(create_data),
289 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
290 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
291 _ <- notify_and_stream(activity),
292 :ok <- maybe_schedule_poll_notifications(activity),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 else
296 {:quick_insert, true, activity} ->
297 {:ok, activity}
298
299 {:fake, true, activity} ->
300 {:ok, activity}
301
302 {:error, message} ->
303 Repo.rollback(message)
304 end
305 end
306
307 defp maybe_schedule_poll_notifications(activity) do
308 PollWorker.schedule_poll_end(activity)
309 :ok
310 end
311
312 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
313 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
314 additional = params[:additional] || %{}
315 # only accept false as false value
316 local = !(params[:local] == false)
317 published = params[:published]
318
319 listen_data =
320 make_listen_data(
321 %{to: to, actor: actor, published: published, context: context, object: object},
322 additional
323 )
324
325 with {:ok, activity} <- insert(listen_data, local),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_federate(activity) do
328 {:ok, activity}
329 end
330 end
331
332 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
333 {:ok, Activity.t()} | nil | {:error, any()}
334 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
335 with {:ok, result} <-
336 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
337 result
338 end
339 end
340
341 defp do_unfollow(follower, followed, activity_id, local) do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
344 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
345 {:ok, activity} <- insert(unfollow_data, local),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
348 {:ok, activity}
349 else
350 nil -> nil
351 {:error, error} -> Repo.rollback(error)
352 end
353 end
354
355 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
356 def flag(params) do
357 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
358 result
359 end
360 end
361
362 defp do_flag(
363 %{
364 actor: actor,
365 context: _context,
366 account: account,
367 statuses: statuses,
368 content: content
369 } = params
370 ) do
371 # only accept false as false value
372 local = !(params[:local] == false)
373 forward = !(params[:forward] == false)
374
375 additional = params[:additional] || %{}
376
377 additional =
378 if forward do
379 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
380 else
381 Map.merge(additional, %{"to" => [], "cc" => []})
382 end
383
384 with flag_data <- make_flag_data(params, additional),
385 {:ok, activity} <- insert(flag_data, local),
386 {:ok, stripped_activity} <- strip_report_status_data(activity),
387 _ <- notify_and_stream(activity),
388 :ok <-
389 maybe_federate(stripped_activity) do
390 User.all_superusers()
391 |> Enum.filter(fn user -> user.ap_id != actor end)
392 |> Enum.filter(fn user -> not is_nil(user.email) end)
393 |> Enum.each(fn superuser ->
394 superuser
395 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
396 |> Pleroma.Emails.Mailer.deliver_async()
397 end)
398
399 {:ok, activity}
400 else
401 {:error, error} -> Repo.rollback(error)
402 end
403 end
404
405 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
406 def move(%User{} = origin, %User{} = target, local \\ true) do
407 params = %{
408 "type" => "Move",
409 "actor" => origin.ap_id,
410 "object" => origin.ap_id,
411 "target" => target.ap_id
412 }
413
414 with true <- origin.ap_id in target.also_known_as,
415 {:ok, activity} <- insert(params, local),
416 _ <- notify_and_stream(activity) do
417 maybe_federate(activity)
418
419 BackgroundWorker.enqueue("move_following", %{
420 "origin_id" => origin.id,
421 "target_id" => target.id
422 })
423
424 {:ok, activity}
425 else
426 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
427 err -> err
428 end
429 end
430
431 def fetch_activities_for_context_query(context, opts) do
432 public = [Constants.as_public()]
433
434 recipients =
435 if opts[:user],
436 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
437 else: public
438
439 from(activity in Activity)
440 |> maybe_preload_objects(opts)
441 |> maybe_preload_bookmarks(opts)
442 |> maybe_set_thread_muted_field(opts)
443 |> restrict_blocked(opts)
444 |> restrict_blockers_visibility(opts)
445 |> restrict_recipients(recipients, opts[:user])
446 |> restrict_filtered(opts)
447 |> where(
448 [activity],
449 fragment(
450 "?->>'type' = ? and ?->>'context' = ?",
451 activity.data,
452 "Create",
453 activity.data,
454 ^context
455 )
456 )
457 |> exclude_poll_votes(opts)
458 |> exclude_id(opts)
459 |> order_by([activity], desc: activity.id)
460 end
461
462 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
463 def fetch_activities_for_context(context, opts \\ %{}) do
464 context
465 |> fetch_activities_for_context_query(opts)
466 |> Repo.all()
467 end
468
469 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
470 FlakeId.Ecto.CompatType.t() | nil
471 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
472 context
473 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
474 |> restrict_visibility(%{visibility: "direct"})
475 |> limit(1)
476 |> select([a], a.id)
477 |> Repo.one()
478 end
479
480 defp fetch_paginated_optimized(query, opts, pagination) do
481 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
482 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
483 opts = Map.put(opts, :skip_extra_order, true)
484
485 Pagination.fetch_paginated(query, opts, pagination)
486 end
487
488 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
489 list_memberships = Pleroma.List.memberships(opts[:user])
490
491 fetch_activities_query(recipients ++ list_memberships, opts)
492 |> fetch_paginated_optimized(opts, pagination)
493 |> Enum.reverse()
494 |> maybe_update_cc(list_memberships, opts[:user])
495 end
496
497 def fetch_activities_secret(recipients, opts \\ %{}, pagination \\ :keyset) do
498 list_memberships = Pleroma.List.memberships(opts[:user])
499
500 fetch_activities_query_secret(recipients ++ list_memberships, opts)
501 |> fetch_paginated_optimized(opts, pagination)
502 |> Enum.reverse()
503 end
504
505
506 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
507 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
508 opts = Map.delete(opts, :user)
509
510 [Constants.as_public()]
511 |> fetch_activities_query(opts)
512 |> restrict_unlisted(opts)
513 |> fetch_paginated_optimized(opts, pagination)
514 end
515
516 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
517 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
518 opts
519 |> Map.put(:restrict_unlisted, true)
520 |> fetch_public_or_unlisted_activities(pagination)
521 end
522
523 @valid_visibilities ~w[direct unlisted public private]
524
525 defp restrict_visibility(query, %{visibility: visibility})
526 when is_list(visibility) do
527 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
528 from(
529 a in query,
530 where:
531 fragment(
532 "activity_visibility(?, ?, ?) = ANY (?)",
533 a.actor,
534 a.recipients,
535 a.data,
536 ^visibility
537 )
538 )
539 else
540 Logger.error("Could not restrict visibility to #{visibility}")
541 end
542 end
543
544 defp restrict_visibility(query, %{visibility: visibility})
545 when visibility in @valid_visibilities do
546 from(
547 a in query,
548 where:
549 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
550 )
551 end
552
553 defp restrict_visibility(_query, %{visibility: visibility})
554 when visibility not in @valid_visibilities do
555 Logger.error("Could not restrict visibility to #{visibility}")
556 end
557
558 defp restrict_visibility(query, _visibility), do: query
559
560 defp exclude_visibility(query, %{exclude_visibilities: visibility})
561 when is_list(visibility) do
562 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
563 from(
564 a in query,
565 where:
566 not fragment(
567 "activity_visibility(?, ?, ?) = ANY (?)",
568 a.actor,
569 a.recipients,
570 a.data,
571 ^visibility
572 )
573 )
574 else
575 Logger.error("Could not exclude visibility to #{visibility}")
576 query
577 end
578 end
579
580 defp exclude_visibility(query, %{exclude_visibilities: visibility})
581 when visibility in @valid_visibilities do
582 from(
583 a in query,
584 where:
585 not fragment(
586 "activity_visibility(?, ?, ?) = ?",
587 a.actor,
588 a.recipients,
589 a.data,
590 ^visibility
591 )
592 )
593 end
594
595 defp exclude_visibility(query, %{exclude_visibilities: visibility})
596 when visibility not in [nil | @valid_visibilities] do
597 Logger.error("Could not exclude visibility to #{visibility}")
598 query
599 end
600
601 defp exclude_visibility(query, _visibility), do: query
602
603 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
604 do: query
605
606 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
607 do: query
608
609 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
610 from(
611 a in query,
612 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
613 )
614 end
615
616 defp restrict_thread_visibility(query, _, _), do: query
617
618 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
619 params =
620 params
621 |> Map.put(:user, reading_user)
622 |> Map.put(:actor_id, user.ap_id)
623
624 %{
625 godmode: params[:godmode],
626 reading_user: reading_user
627 }
628 |> user_activities_recipients()
629 |> fetch_activities(params)
630 |> Enum.reverse()
631 end
632
633 def fetch_user_activities(user, reading_user, params \\ %{})
634
635 def fetch_user_activities(user, reading_user, %{total: true} = params) do
636 result = fetch_activities_for_user(user, reading_user, params)
637
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
639 end
640
641 def fetch_user_activities(user, reading_user, params) do
642 user
643 |> fetch_activities_for_user(reading_user, params)
644 |> Enum.reverse()
645 end
646
647 defp fetch_activities_for_user(user, reading_user, params) do
648 params =
649 params
650 |> Map.put(:type, ["Create", "Announce"])
651 |> Map.put(:user, reading_user)
652 |> Map.put(:actor_id, user.ap_id)
653 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
654
655 params =
656 if User.blocks?(reading_user, user) do
657 params
658 else
659 params
660 |> Map.put(:blocking_user, reading_user)
661 |> Map.put(:muting_user, reading_user)
662 end
663
664 pagination_type = Map.get(params, :pagination_type) || :keyset
665
666 %{
667 godmode: params[:godmode],
668 reading_user: reading_user
669 }
670 |> user_activities_recipients()
671 |> fetch_activities(params, pagination_type)
672 end
673
674 def fetch_statuses(reading_user, %{total: true} = params) do
675 result = fetch_activities_for_reading_user(reading_user, params)
676 Keyword.put(result, :items, Enum.reverse(result[:items]))
677 end
678
679 def fetch_statuses(reading_user, params) do
680 reading_user
681 |> fetch_activities_for_reading_user(params)
682 |> Enum.reverse()
683 end
684
685 defp fetch_activities_for_reading_user(reading_user, params) do
686 params = Map.put(params, :type, ["Create", "Announce"])
687
688 %{
689 godmode: params[:godmode],
690 reading_user: reading_user
691 }
692 |> user_activities_recipients()
693 |> fetch_activities(params, :offset)
694 end
695
696 defp user_activities_recipients(%{godmode: true}), do: []
697
698 defp user_activities_recipients(%{reading_user: reading_user}) do
699 if reading_user do
700 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
701 else
702 [Constants.as_public()]
703 end
704 end
705
706 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
707 raise "Can't use the child object without preloading!"
708 end
709
710 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
711 from(
712 [activity, object] in query,
713 where:
714 fragment(
715 "?->>'type' != ? or ?->>'actor' != ?",
716 activity.data,
717 "Announce",
718 object.data,
719 ^actor
720 )
721 )
722 end
723
724 defp restrict_announce_object_actor(query, _), do: query
725
726 defp restrict_since(query, %{since_id: ""}), do: query
727
728 defp restrict_since(query, %{since_id: since_id}) do
729 from(activity in query, where: activity.id > ^since_id)
730 end
731
732 defp restrict_since(query, _), do: query
733
734 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
735 raise_on_missing_preload()
736 end
737
738 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
739 from(
740 [_activity, object] in query,
741 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
742 )
743 end
744
745 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
746 restrict_embedded_tag_any(query, %{tag: tag})
747 end
748
749 defp restrict_embedded_tag_all(query, _), do: query
750
751 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
752 raise_on_missing_preload()
753 end
754
755 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
756 from(
757 [_activity, object] in query,
758 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
759 )
760 end
761
762 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
763 restrict_embedded_tag_any(query, %{tag: [tag]})
764 end
765
766 defp restrict_embedded_tag_any(query, _), do: query
767
768 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
769 raise_on_missing_preload()
770 end
771
772 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
773 from(
774 [_activity, object] in query,
775 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
776 )
777 end
778
779 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
780 when is_binary(tag_reject) do
781 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
782 end
783
784 defp restrict_embedded_tag_reject_any(query, _), do: query
785
786 defp object_ids_query_for_tags(tags) do
787 from(hto in "hashtags_objects")
788 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
789 |> where([hto, ht], ht.name in ^tags)
790 |> select([hto], hto.object_id)
791 |> distinct([hto], true)
792 end
793
794 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
795 raise_on_missing_preload()
796 end
797
798 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
799 restrict_hashtag_any(query, %{tag: single_tag})
800 end
801
802 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
803 from(
804 [_activity, object] in query,
805 where:
806 fragment(
807 """
808 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
809 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
810 AND hashtags_objects.object_id = ?) @> ?
811 """,
812 ^tags,
813 object.id,
814 ^tags
815 )
816 )
817 end
818
819 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
820 restrict_hashtag_all(query, %{tag_all: [tag]})
821 end
822
823 defp restrict_hashtag_all(query, _), do: query
824
825 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
826 raise_on_missing_preload()
827 end
828
829 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
830 hashtag_ids =
831 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
832 |> Repo.all()
833
834 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
835 from(
836 [_activity, object] in query,
837 join: hto in "hashtags_objects",
838 on: hto.object_id == object.id,
839 where: hto.hashtag_id in ^hashtag_ids,
840 distinct: [desc: object.id],
841 order_by: [desc: object.id]
842 )
843 end
844
845 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
846 restrict_hashtag_any(query, %{tag: [tag]})
847 end
848
849 defp restrict_hashtag_any(query, _), do: query
850
851 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
852 raise_on_missing_preload()
853 end
854
855 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
856 from(
857 [_activity, object] in query,
858 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
859 )
860 end
861
862 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
863 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
864 end
865
866 defp restrict_hashtag_reject_any(query, _), do: query
867
868 defp raise_on_missing_preload do
869 raise "Can't use the child object without preloading!"
870 end
871
872 defp restrict_recipients(query, [], _user), do: query
873
874 defp restrict_recipients(query, recipients, nil) do
875 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
876 end
877
878 defp restrict_recipients(query, recipients, user) do
879 from(
880 activity in query,
881 where: fragment("? && ?", ^recipients, activity.recipients),
882 or_where: activity.actor == ^user.ap_id
883 )
884 end
885
886 defp restrict_local(query, %{local_only: true}) do
887 from(activity in query, where: activity.local == true)
888 end
889
890 defp restrict_local(query, _), do: query
891
892 defp restrict_remote(query, %{remote: true}) do
893 from(activity in query, where: activity.local == false)
894 end
895
896 defp restrict_remote(query, _), do: query
897
898 defp restrict_actor(query, %{actor_id: actor_id}) do
899 from(activity in query, where: activity.actor == ^actor_id)
900 end
901
902 defp restrict_actor(query, _), do: query
903
904 defp restrict_type(query, %{type: type}) when is_binary(type) do
905 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
906 end
907
908 defp restrict_type(query, %{type: type}) do
909 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
910 end
911
912 defp restrict_type(query, _), do: query
913
914 defp restrict_state(query, %{state: state}) do
915 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
916 end
917
918 defp restrict_state(query, _), do: query
919
920 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
921 from(
922 [_activity, object] in query,
923 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
924 )
925 end
926
927 defp restrict_favorited_by(query, _), do: query
928
929 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
930 raise "Can't use the child object without preloading!"
931 end
932
933 defp restrict_media(query, %{only_media: true}) do
934 from(
935 [activity, object] in query,
936 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
937 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
938 )
939 end
940
941 defp restrict_media(query, _), do: query
942
943 defp restrict_replies(query, %{exclude_replies: true}) do
944 from(
945 [_activity, object] in query,
946 where: fragment("?->>'inReplyTo' is null", object.data)
947 )
948 end
949
950 defp restrict_replies(query, %{
951 reply_filtering_user: %User{} = user,
952 reply_visibility: "self"
953 }) do
954 from(
955 [activity, object] in query,
956 where:
957 fragment(
958 "?->>'inReplyTo' is null OR ? = ANY(?)",
959 object.data,
960 ^user.ap_id,
961 activity.recipients
962 )
963 )
964 end
965
966 defp restrict_replies(query, %{
967 reply_filtering_user: %User{} = user,
968 reply_visibility: "following"
969 }) do
970 from(
971 [activity, object] in query,
972 where:
973 fragment(
974 """
975 ?->>'type' != 'Create' -- This isn't a Create
976 OR ?->>'inReplyTo' is null -- this isn't a reply
977 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
978 -- unless they are the author (because authors
979 -- are also part of the recipients). This leads
980 -- to a bug that self-replies by friends won't
981 -- show up.
982 OR ? = ? -- The actor is us
983 """,
984 activity.data,
985 object.data,
986 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
987 activity.recipients,
988 activity.actor,
989 activity.actor,
990 ^user.ap_id
991 )
992 )
993 end
994
995 defp restrict_replies(query, _), do: query
996
997 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
998 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
999 end
1000
1001 defp restrict_reblogs(query, _), do: query
1002
1003 defp restrict_muted(query, %{with_muted: true}), do: query
1004
1005 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1006 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1007
1008 query =
1009 from([activity] in query,
1010 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1011 where:
1012 fragment(
1013 "not (?->'to' \\?| ?) or ? = ?",
1014 activity.data,
1015 ^mutes,
1016 activity.actor,
1017 ^user.ap_id
1018 )
1019 )
1020
1021 unless opts[:skip_preload] do
1022 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1023 else
1024 query
1025 end
1026 end
1027
1028 defp restrict_muted(query, _), do: query
1029
1030 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1031 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1032 domain_blocks = user.domain_blocks || []
1033
1034 following_ap_ids = User.get_friends_ap_ids(user)
1035
1036 query =
1037 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1038
1039 from(
1040 [activity, object: o] in query,
1041 # You don't block the author
1042 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1043
1044 # You don't block any recipients, and didn't author the post
1045 where:
1046 fragment(
1047 "((not (? && ?)) or ? = ?)",
1048 activity.recipients,
1049 ^blocked_ap_ids,
1050 activity.actor,
1051 ^user.ap_id
1052 ),
1053
1054 # You don't block the domain of any recipients, and didn't author the post
1055 where:
1056 fragment(
1057 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1058 activity.recipients,
1059 ^domain_blocks,
1060 activity.actor,
1061 ^user.ap_id
1062 ),
1063
1064 # It's not a boost of a user you block
1065 where:
1066 fragment(
1067 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1068 activity.data,
1069 activity.data,
1070 ^blocked_ap_ids
1071 ),
1072
1073 # You don't block the author's domain, and also don't follow the author
1074 where:
1075 fragment(
1076 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1077 activity.actor,
1078 ^domain_blocks,
1079 activity.actor,
1080 ^following_ap_ids
1081 ),
1082
1083 # Same as above, but checks the Object
1084 where:
1085 fragment(
1086 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1087 o.data,
1088 ^domain_blocks,
1089 o.data,
1090 ^following_ap_ids
1091 )
1092 )
1093 end
1094
1095 defp restrict_blocked(query, _), do: query
1096
1097 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1098 if Config.get([:activitypub, :blockers_visible]) == true do
1099 query
1100 else
1101 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1102
1103 from(
1104 activity in query,
1105 # The author doesn't block you
1106 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1107
1108 # It's not a boost of a user that blocks you
1109 where:
1110 fragment(
1111 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1112 activity.data,
1113 activity.data,
1114 ^blocker_ap_ids
1115 )
1116 )
1117 end
1118 end
1119
1120 defp restrict_blockers_visibility(query, _), do: query
1121
1122 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1123 from(
1124 activity in query,
1125 where:
1126 fragment(
1127 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1128 activity.data,
1129 ^[Constants.as_public()]
1130 )
1131 )
1132 end
1133
1134 defp restrict_unlisted(query, _), do: query
1135
1136 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1137 from(
1138 [activity, object: o] in query,
1139 where:
1140 fragment(
1141 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1142 activity.data,
1143 activity.data,
1144 activity.data,
1145 ^ids
1146 )
1147 )
1148 end
1149
1150 defp restrict_pinned(query, _), do: query
1151
1152 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1153 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1154
1155 from(
1156 activity in query,
1157 where:
1158 fragment(
1159 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1160 activity.data,
1161 activity.actor,
1162 ^muted_reblogs
1163 )
1164 )
1165 end
1166
1167 defp restrict_muted_reblogs(query, _), do: query
1168
1169 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1170 from(
1171 activity in query,
1172 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1173 )
1174 end
1175
1176 defp restrict_instance(query, _), do: query
1177
1178 defp restrict_filtered(query, %{user: %User{} = user}) do
1179 case Filter.compose_regex(user) do
1180 nil ->
1181 query
1182
1183 regex ->
1184 from([activity, object] in query,
1185 where:
1186 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1187 activity.actor == ^user.ap_id
1188 )
1189 end
1190 end
1191
1192 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1193 restrict_filtered(query, %{user: user})
1194 end
1195
1196 defp restrict_filtered(query, _), do: query
1197
1198 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1199
1200 defp exclude_poll_votes(query, _) do
1201 if has_named_binding?(query, :object) do
1202 from([activity, object: o] in query,
1203 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1204 )
1205 else
1206 query
1207 end
1208 end
1209
1210 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1211
1212 defp exclude_chat_messages(query, _) do
1213 if has_named_binding?(query, :object) do
1214 from([activity, object: o] in query,
1215 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1216 )
1217 else
1218 query
1219 end
1220 end
1221
1222 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1223
1224 defp exclude_invisible_actors(query, _opts) do
1225 invisible_ap_ids =
1226 User.Query.build(%{invisible: true, select: [:ap_id]})
1227 |> Repo.all()
1228 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1229
1230 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1231 end
1232
1233 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1234 from(activity in query, where: activity.id != ^id)
1235 end
1236
1237 defp exclude_id(query, _), do: query
1238
1239 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1240
1241 defp maybe_preload_objects(query, _) do
1242 query
1243 |> Activity.with_preloaded_object()
1244 end
1245
1246 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1247
1248 defp maybe_preload_bookmarks(query, opts) do
1249 query
1250 |> Activity.with_preloaded_bookmark(opts[:user])
1251 end
1252
1253 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1254 query
1255 |> Activity.with_preloaded_report_notes()
1256 end
1257
1258 defp maybe_preload_report_notes(query, _), do: query
1259
1260 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1261
1262 defp maybe_set_thread_muted_field(query, opts) do
1263 query
1264 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1265 end
1266
1267 defp maybe_order(query, %{order: :desc}) do
1268 query
1269 |> order_by(desc: :id)
1270 end
1271
1272 defp maybe_order(query, %{order: :asc}) do
1273 query
1274 |> order_by(asc: :id)
1275 end
1276
1277 defp maybe_order(query, _), do: query
1278
1279 defp normalize_fetch_activities_query_opts(opts) do
1280 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1281 case opts[key] do
1282 value when is_bitstring(value) ->
1283 Map.put(opts, key, Hashtag.normalize_name(value))
1284
1285 value when is_list(value) ->
1286 normalized_value =
1287 value
1288 |> Enum.map(&Hashtag.normalize_name/1)
1289 |> Enum.uniq()
1290
1291 Map.put(opts, key, normalized_value)
1292
1293 _ ->
1294 opts
1295 end
1296 end)
1297 end
1298
1299 defp fetch_activities_query_ap_ids_ops(opts) do
1300 source_user = opts[:muting_user]
1301 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1302
1303 ap_id_relationships =
1304 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1305 [:block | ap_id_relationships]
1306 else
1307 ap_id_relationships
1308 end
1309
1310 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1311
1312 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1313 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1314
1315 restrict_muted_reblogs_opts =
1316 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1317
1318 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1319 end
1320
1321 def fetch_activities_query_secret(recipients, opts \\ %{}) do
1322 opts = normalize_fetch_activities_query_opts(opts)
1323
1324 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1325 fetch_activities_query_ap_ids_ops(opts)
1326
1327 config = %{
1328 skip_thread_containment: true
1329 }
1330
1331 query =
1332 Activity
1333 |> maybe_preload_objects(opts)
1334 |> maybe_preload_bookmarks(opts)
1335 |> maybe_preload_report_notes(opts)
1336 |> maybe_set_thread_muted_field(opts)
1337 |> maybe_order(opts)
1338 |> restrict_recipients(recipients, opts[:user])
1339 |> restrict_replies(opts)
1340 |> restrict_since(opts)
1341 |> restrict_local(opts)
1342 |> restrict_remote(opts)
1343 |> restrict_actor(opts)
1344 |> restrict_type(opts)
1345 |> restrict_state(opts)
1346 |> restrict_favorited_by(opts)
1347 |> restrict_blocked(restrict_blocked_opts)
1348 |> restrict_blockers_visibility(opts)
1349 |> restrict_muted(restrict_muted_opts)
1350 |> restrict_filtered(opts)
1351 |> restrict_media(opts)
1352 |> restrict_visibility(opts)
1353 |> restrict_thread_visibility(opts, config)
1354 |> restrict_reblogs(opts)
1355 |> restrict_pinned(opts)
1356 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1357 |> restrict_instance(opts)
1358 |> restrict_announce_object_actor(opts)
1359 |> restrict_filtered(opts)
1360 |> exclude_poll_votes(opts)
1361 |> exclude_chat_messages(opts)
1362 |> exclude_invisible_actors(opts)
1363 |> exclude_visibility(opts)
1364
1365 if Config.feature_enabled?(:improved_hashtag_timeline) do
1366 query
1367 |> restrict_hashtag_any(opts)
1368 |> restrict_hashtag_all(opts)
1369 |> restrict_hashtag_reject_any(opts)
1370 else
1371 query
1372 |> restrict_embedded_tag_any(opts)
1373 |> restrict_embedded_tag_all(opts)
1374 |> restrict_embedded_tag_reject_any(opts)
1375 end
1376 end
1377
1378 def fetch_activities_query(recipients, opts \\ %{}) do
1379 opts = normalize_fetch_activities_query_opts(opts)
1380
1381 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1382 fetch_activities_query_ap_ids_ops(opts)
1383
1384 config = %{
1385 skip_thread_containment: true
1386 }
1387
1388 query =
1389 Activity
1390 |> maybe_preload_objects(opts)
1391 |> maybe_preload_bookmarks(opts)
1392 |> maybe_preload_report_notes(opts)
1393 |> maybe_set_thread_muted_field(opts)
1394 |> maybe_order(opts)
1395 |> restrict_recipients(recipients, opts[:user])
1396 |> restrict_replies(opts)
1397 |> restrict_since(opts)
1398 |> restrict_local(opts)
1399 |> restrict_remote(opts)
1400 |> restrict_actor(opts)
1401 |> restrict_type(opts)
1402 |> restrict_state(opts)
1403 |> restrict_favorited_by(opts)
1404 |> restrict_blocked(restrict_blocked_opts)
1405 |> restrict_blockers_visibility(opts)
1406 |> restrict_muted(restrict_muted_opts)
1407 |> restrict_filtered(opts)
1408 |> restrict_media(opts)
1409 |> restrict_visibility(opts)
1410 |> restrict_thread_visibility(opts, config)
1411 |> restrict_reblogs(opts)
1412 |> restrict_pinned(opts)
1413 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1414 |> restrict_instance(opts)
1415 |> restrict_announce_object_actor(opts)
1416 |> restrict_filtered(opts)
1417 |> exclude_poll_votes(opts)
1418 |> exclude_chat_messages(opts)
1419 |> exclude_invisible_actors(opts)
1420 |> exclude_visibility(opts)
1421
1422 if Config.feature_enabled?(:improved_hashtag_timeline) do
1423 query
1424 |> restrict_hashtag_any(opts)
1425 |> restrict_hashtag_all(opts)
1426 |> restrict_hashtag_reject_any(opts)
1427 else
1428 query
1429 |> restrict_embedded_tag_any(opts)
1430 |> restrict_embedded_tag_all(opts)
1431 |> restrict_embedded_tag_reject_any(opts)
1432 end
1433 end
1434
1435 @doc """
1436 Fetch favorites activities of user with order by sort adds to favorites
1437 """
1438 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1439 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1440 user.ap_id
1441 |> Activity.Queries.by_actor()
1442 |> Activity.Queries.by_type("Like")
1443 |> Activity.with_joined_object()
1444 |> Object.with_joined_activity()
1445 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1446 |> order_by([like, _, _], desc_nulls_last: like.id)
1447 |> Pagination.fetch_paginated(
1448 Map.merge(params, %{skip_order: true}),
1449 pagination
1450 )
1451 end
1452
1453 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1454 Enum.map(activities, fn
1455 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1456 if Enum.any?(bcc, &(&1 in list_memberships)) do
1457 update_in(activity.data["cc"], &[user_ap_id | &1])
1458 else
1459 activity
1460 end
1461
1462 activity ->
1463 activity
1464 end)
1465 end
1466
1467 defp maybe_update_cc(activities, _, _), do: activities
1468
1469 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1470 from(activity in query,
1471 where:
1472 fragment("? && ?", activity.recipients, ^recipients) or
1473 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1474 ^Constants.as_public() in activity.recipients)
1475 )
1476 end
1477
1478 def fetch_activities_bounded(
1479 recipients,
1480 recipients_with_public,
1481 opts \\ %{},
1482 pagination \\ :keyset
1483 ) do
1484 fetch_activities_query([], opts)
1485 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1486 |> Pagination.fetch_paginated(opts, pagination)
1487 |> Enum.reverse()
1488 end
1489
1490 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1491 def upload(file, opts \\ []) do
1492 with {:ok, data} <- Upload.store(file, opts) do
1493 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1494
1495 Repo.insert(%Object{data: obj_data})
1496 end
1497 end
1498
1499 @spec get_actor_url(any()) :: binary() | nil
1500 defp get_actor_url(url) when is_binary(url), do: url
1501 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1502
1503 defp get_actor_url(url) when is_list(url) do
1504 url
1505 |> List.first()
1506 |> get_actor_url()
1507 end
1508
1509 defp get_actor_url(_url), do: nil
1510
1511 defp normalize_image(%{"url" => url}) do
1512 %{
1513 "type" => "Image",
1514 "url" => [%{"href" => url}]
1515 }
1516 end
1517
1518 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1519 defp normalize_image(_), do: nil
1520
1521 defp object_to_user_data(data) do
1522 fields =
1523 data
1524 |> Map.get("attachment", [])
1525 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1526 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1527
1528 emojis =
1529 data
1530 |> Map.get("tag", [])
1531 |> Enum.filter(fn
1532 %{"type" => "Emoji"} -> true
1533 _ -> false
1534 end)
1535 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1536 {String.trim(name, ":"), url}
1537 end)
1538
1539 is_locked = data["manuallyApprovesFollowers"] || false
1540 capabilities = data["capabilities"] || %{}
1541 accepts_chat_messages = capabilities["acceptsChatMessages"]
1542 data = Transmogrifier.maybe_fix_user_object(data)
1543 is_discoverable = data["discoverable"] || false
1544 invisible = data["invisible"] || false
1545 actor_type = data["type"] || "Person"
1546
1547 featured_address = data["featured"]
1548 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1549
1550 public_key =
1551 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1552 data["publicKey"]["publicKeyPem"]
1553 else
1554 nil
1555 end
1556
1557 shared_inbox =
1558 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1559 data["endpoints"]["sharedInbox"]
1560 else
1561 nil
1562 end
1563
1564 user_data = %{
1565 ap_id: data["id"],
1566 uri: get_actor_url(data["url"]),
1567 ap_enabled: true,
1568 banner: normalize_image(data["image"]),
1569 fields: fields,
1570 emoji: emojis,
1571 is_locked: is_locked,
1572 is_discoverable: is_discoverable,
1573 invisible: invisible,
1574 avatar: normalize_image(data["icon"]),
1575 name: data["name"],
1576 follower_address: data["followers"],
1577 following_address: data["following"],
1578 featured_address: featured_address,
1579 bio: data["summary"] || "",
1580 actor_type: actor_type,
1581 also_known_as: Map.get(data, "alsoKnownAs", []),
1582 public_key: public_key,
1583 inbox: data["inbox"],
1584 shared_inbox: shared_inbox,
1585 accepts_chat_messages: accepts_chat_messages,
1586 pinned_objects: pinned_objects
1587 }
1588
1589 # nickname can be nil because of virtual actors
1590 if data["preferredUsername"] do
1591 Map.put(
1592 user_data,
1593 :nickname,
1594 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1595 )
1596 else
1597 Map.put(user_data, :nickname, nil)
1598 end
1599 end
1600
1601 def fetch_follow_information_for_user(user) do
1602 with {:ok, following_data} <-
1603 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1604 {:ok, hide_follows} <- collection_private(following_data),
1605 {:ok, followers_data} <-
1606 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1607 {:ok, hide_followers} <- collection_private(followers_data) do
1608 {:ok,
1609 %{
1610 hide_follows: hide_follows,
1611 follower_count: normalize_counter(followers_data["totalItems"]),
1612 following_count: normalize_counter(following_data["totalItems"]),
1613 hide_followers: hide_followers
1614 }}
1615 else
1616 {:error, _} = e -> e
1617 e -> {:error, e}
1618 end
1619 end
1620
1621 defp normalize_counter(counter) when is_integer(counter), do: counter
1622 defp normalize_counter(_), do: 0
1623
1624 def maybe_update_follow_information(user_data) do
1625 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1626 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1627 {_, true} <-
1628 {:collections_available,
1629 !!(user_data[:following_address] && user_data[:follower_address])},
1630 {:ok, info} <-
1631 fetch_follow_information_for_user(user_data) do
1632 info = Map.merge(user_data[:info] || %{}, info)
1633
1634 user_data
1635 |> Map.put(:info, info)
1636 else
1637 {:user_type_check, false} ->
1638 user_data
1639
1640 {:collections_available, false} ->
1641 user_data
1642
1643 {:enabled, false} ->
1644 user_data
1645
1646 e ->
1647 Logger.error(
1648 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1649 )
1650
1651 user_data
1652 end
1653 end
1654
1655 defp collection_private(%{"first" => %{"type" => type}})
1656 when type in ["CollectionPage", "OrderedCollectionPage"],
1657 do: {:ok, false}
1658
1659 defp collection_private(%{"first" => first}) do
1660 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1661 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1662 {:ok, false}
1663 else
1664 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1665 {:error, _} = e -> e
1666 e -> {:error, e}
1667 end
1668 end
1669
1670 defp collection_private(_data), do: {:ok, true}
1671
1672 def user_data_from_user_object(data) do
1673 with {:ok, data} <- MRF.filter(data) do
1674 {:ok, object_to_user_data(data)}
1675 else
1676 e -> {:error, e}
1677 end
1678 end
1679
1680 def fetch_and_prepare_user_from_ap_id(ap_id) do
1681 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1682 {:ok, data} <- user_data_from_user_object(data) do
1683 {:ok, maybe_update_follow_information(data)}
1684 else
1685 # If this has been deleted, only log a debug and not an error
1686 {:error, "Object has been deleted" = e} ->
1687 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1688 {:error, e}
1689
1690 {:error, {:reject, reason} = e} ->
1691 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1692 {:error, e}
1693
1694 {:error, e} ->
1695 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1696 {:error, e}
1697 end
1698 end
1699
1700 def maybe_handle_clashing_nickname(data) do
1701 with nickname when is_binary(nickname) <- data[:nickname],
1702 %User{} = old_user <- User.get_by_nickname(nickname),
1703 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1704 Logger.info(
1705 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1706 )
1707
1708 old_user
1709 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1710 |> User.update_and_set_cache()
1711 else
1712 {:ap_id_comparison, true} ->
1713 Logger.info(
1714 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1715 )
1716
1717 _ ->
1718 nil
1719 end
1720 end
1721
1722 def pin_data_from_featured_collection(%{
1723 "type" => type,
1724 "orderedItems" => objects
1725 })
1726 when type in ["OrderedCollection", "Collection"] do
1727 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1728 end
1729
1730 def fetch_and_prepare_featured_from_ap_id(nil) do
1731 {:ok, %{}}
1732 end
1733
1734 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1735 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1736 {:ok, pin_data_from_featured_collection(data)}
1737 else
1738 e ->
1739 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1740 {:ok, %{}}
1741 end
1742 end
1743
1744 def pinned_fetch_task(nil), do: nil
1745
1746 def pinned_fetch_task(%{pinned_objects: pins}) do
1747 if Enum.all?(pins, fn {ap_id, _} ->
1748 Object.get_cached_by_ap_id(ap_id) ||
1749 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1750 end) do
1751 :ok
1752 else
1753 :error
1754 end
1755 end
1756
1757 def make_user_from_ap_id(ap_id) do
1758 user = User.get_cached_by_ap_id(ap_id)
1759
1760 if user && !User.ap_enabled?(user) do
1761 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1762 else
1763 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1764 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1765
1766 if user do
1767 user
1768 |> User.remote_user_changeset(data)
1769 |> User.update_and_set_cache()
1770 else
1771 maybe_handle_clashing_nickname(data)
1772
1773 data
1774 |> User.remote_user_changeset()
1775 |> Repo.insert()
1776 |> User.set_cache()
1777 end
1778 end
1779 end
1780 end
1781
1782 def make_user_from_nickname(nickname) do
1783 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1784 make_user_from_ap_id(ap_id)
1785 else
1786 _e -> {:error, "No AP id in WebFinger"}
1787 end
1788 end
1789
1790 # filter out broken threads
1791 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1792 entire_thread_visible_for_user?(activity, user)
1793 end
1794
1795 # do post-processing on a specific activity
1796 def contain_activity(%Activity{} = activity, %User{} = user) do
1797 contain_broken_threads(activity, user)
1798 end
1799
1800 def fetch_direct_messages_query do
1801 Activity
1802 |> restrict_type(%{type: "Create"})
1803 |> restrict_visibility(%{visibility: "direct"})
1804 |> order_by([activity], asc: activity.id)
1805 end
1806 end