1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
35 require Pleroma.Constants
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
67 defp check_actor_can_insert(_), do: true
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
74 defp check_remote_limit(_), do: true
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
84 defp increase_replies_count_if_reply(%{
85 "object" => %{"inReplyTo" => reply_ap_id} = object,
88 if is_public?(object) do
89 Object.increase_replies_count(reply_ap_id)
93 defp increase_replies_count_if_reply(_create_data), do: :noop
95 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
97 def persist(%{"type" => type} = object, meta) when type in @object_types do
98 with {:ok, object} <- Object.create(object) do
104 def persist(object, meta) do
105 with local <- Keyword.fetch!(meta, :local),
106 {recipients, _, _} <- get_recipients(object),
108 Repo.insert(%Activity{
111 recipients: recipients,
112 actor: object["actor"]
114 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
115 {:ok, _} <- maybe_create_activity_expiration(activity) do
116 {:ok, activity, meta}
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
125 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map),
131 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
132 # Splice in the child object if we have one.
133 activity = Maps.put_if_present(activity, :object, object)
135 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
136 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
141 %Activity{} = activity ->
147 {:containment, _} = error ->
150 {:error, _} = error ->
153 {:fake, true, map, recipients} ->
154 activity = %Activity{
158 recipients: recipients,
162 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
165 {:remote_limit_pass, _} ->
166 {:error, :remote_limit}
173 defp insert_activity_with_expiration(data, local, recipients) do
177 actor: data["actor"],
178 recipients: recipients
181 with {:ok, activity} <- Repo.insert(struct) do
182 maybe_create_activity_expiration(activity)
186 def notify_and_stream(activity) do
187 Notification.create_notifications(activity)
189 conversation = create_or_bump_conversation(activity, activity.actor)
190 participations = get_participations(conversation)
192 stream_out_participations(participations)
195 defp maybe_create_activity_expiration(
196 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
199 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
200 activity_id: activity.id,
201 expires_at: expires_at
207 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
209 defp create_or_bump_conversation(activity, actor) do
210 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
211 %User{} = user <- User.get_cached_by_ap_id(actor) do
212 Participation.mark_as_read(user, conversation)
217 defp get_participations({:ok, conversation}) do
219 |> Repo.preload(:participations, force: true)
220 |> Map.get(:participations)
223 defp get_participations(_), do: []
225 def stream_out_participations(participations) do
228 |> Repo.preload(:user)
230 Streamer.stream("participation", participations)
234 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
235 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
236 conversation = Repo.preload(conversation, :participations)
239 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
244 if last_activity_id do
245 stream_out_participations(conversation.participations)
251 def stream_out_participations(_, _), do: :noop
254 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
255 when data_type in ["Create", "Announce", "Delete"] do
257 |> Topics.get_activity_topics()
258 |> Streamer.stream(activity)
262 def stream_out(_activity) do
266 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
267 def create(params, fake \\ false) do
268 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
273 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
274 additional = params[:additional] || %{}
275 # only accept false as false value
276 local = !(params[:local] == false)
277 published = params[:published]
278 quick_insert? = Config.get([:env]) == :benchmark
282 %{to: to, actor: actor, published: published, context: context, object: object},
286 with {:ok, activity} <- insert(create_data, local, fake),
287 {:fake, false, activity} <- {:fake, fake, activity},
288 _ <- increase_replies_count_if_reply(create_data),
289 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
290 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
291 _ <- notify_and_stream(activity),
292 :ok <- maybe_schedule_poll_notifications(activity),
293 :ok <- maybe_federate(activity) do
296 {:quick_insert, true, activity} ->
299 {:fake, true, activity} ->
303 Repo.rollback(message)
307 defp maybe_schedule_poll_notifications(activity) do
308 PollWorker.schedule_poll_end(activity)
312 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
313 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
314 additional = params[:additional] || %{}
315 # only accept false as false value
316 local = !(params[:local] == false)
317 published = params[:published]
321 %{to: to, actor: actor, published: published, context: context, object: object},
325 with {:ok, activity} <- insert(listen_data, local),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_federate(activity) do
332 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
333 {:ok, Activity.t()} | nil | {:error, any()}
334 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
335 with {:ok, result} <-
336 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
341 defp do_unfollow(follower, followed, activity_id, local) do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
344 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
345 {:ok, activity} <- insert(unfollow_data, local),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
351 {:error, error} -> Repo.rollback(error)
355 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
357 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
371 # only accept false as false value
372 local = !(params[:local] == false)
373 forward = !(params[:forward] == false)
375 additional = params[:additional] || %{}
379 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
381 Map.merge(additional, %{"to" => [], "cc" => []})
384 with flag_data <- make_flag_data(params, additional),
385 {:ok, activity} <- insert(flag_data, local),
386 {:ok, stripped_activity} <- strip_report_status_data(activity),
387 _ <- notify_and_stream(activity),
389 maybe_federate(stripped_activity) do
390 User.all_superusers()
391 |> Enum.filter(fn user -> user.ap_id != actor end)
392 |> Enum.filter(fn user -> not is_nil(user.email) end)
393 |> Enum.each(fn superuser ->
395 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
396 |> Pleroma.Emails.Mailer.deliver_async()
401 {:error, error} -> Repo.rollback(error)
405 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
406 def move(%User{} = origin, %User{} = target, local \\ true) do
409 "actor" => origin.ap_id,
410 "object" => origin.ap_id,
411 "target" => target.ap_id
414 with true <- origin.ap_id in target.also_known_as,
415 {:ok, activity} <- insert(params, local),
416 _ <- notify_and_stream(activity) do
417 maybe_federate(activity)
419 BackgroundWorker.enqueue("move_following", %{
420 "origin_id" => origin.id,
421 "target_id" => target.id
426 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
431 def fetch_activities_for_context_query(context, opts) do
432 public = [Constants.as_public()]
436 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
439 from(activity in Activity)
440 |> maybe_preload_objects(opts)
441 |> maybe_preload_bookmarks(opts)
442 |> maybe_set_thread_muted_field(opts)
443 |> restrict_blocked(opts)
444 |> restrict_blockers_visibility(opts)
445 |> restrict_recipients(recipients, opts[:user])
446 |> restrict_filtered(opts)
450 "?->>'type' = ? and ?->>'context' = ?",
457 |> exclude_poll_votes(opts)
459 |> order_by([activity], desc: activity.id)
462 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
463 def fetch_activities_for_context(context, opts \\ %{}) do
465 |> fetch_activities_for_context_query(opts)
469 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
470 FlakeId.Ecto.CompatType.t() | nil
471 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
473 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
474 |> restrict_visibility(%{visibility: "direct"})
480 defp fetch_paginated_optimized(query, opts, pagination) do
481 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
482 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
483 opts = Map.put(opts, :skip_extra_order, true)
485 Pagination.fetch_paginated(query, opts, pagination)
488 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
489 list_memberships = Pleroma.List.memberships(opts[:user])
491 fetch_activities_query(recipients ++ list_memberships, opts)
492 |> fetch_paginated_optimized(opts, pagination)
494 |> maybe_update_cc(list_memberships, opts[:user])
497 def fetch_activities_secret(recipients, opts \\ %{}, pagination \\ :keyset) do
498 list_memberships = Pleroma.List.memberships(opts[:user])
500 fetch_activities_query_secret(recipients ++ list_memberships, opts)
501 |> fetch_paginated_optimized(opts, pagination)
506 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
507 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
508 opts = Map.delete(opts, :user)
510 [Constants.as_public()]
511 |> fetch_activities_query(opts)
512 |> restrict_unlisted(opts)
513 |> fetch_paginated_optimized(opts, pagination)
516 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
517 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
519 |> Map.put(:restrict_unlisted, true)
520 |> fetch_public_or_unlisted_activities(pagination)
523 @valid_visibilities ~w[direct unlisted public private]
525 defp restrict_visibility(query, %{visibility: visibility})
526 when is_list(visibility) do
527 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
532 "activity_visibility(?, ?, ?) = ANY (?)",
540 Logger.error("Could not restrict visibility to #{visibility}")
544 defp restrict_visibility(query, %{visibility: visibility})
545 when visibility in @valid_visibilities do
549 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
553 defp restrict_visibility(_query, %{visibility: visibility})
554 when visibility not in @valid_visibilities do
555 Logger.error("Could not restrict visibility to #{visibility}")
558 defp restrict_visibility(query, _visibility), do: query
560 defp exclude_visibility(query, %{exclude_visibilities: visibility})
561 when is_list(visibility) do
562 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
567 "activity_visibility(?, ?, ?) = ANY (?)",
575 Logger.error("Could not exclude visibility to #{visibility}")
580 defp exclude_visibility(query, %{exclude_visibilities: visibility})
581 when visibility in @valid_visibilities do
586 "activity_visibility(?, ?, ?) = ?",
595 defp exclude_visibility(query, %{exclude_visibilities: visibility})
596 when visibility not in [nil | @valid_visibilities] do
597 Logger.error("Could not exclude visibility to #{visibility}")
601 defp exclude_visibility(query, _visibility), do: query
603 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
606 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
609 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
612 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
616 defp restrict_thread_visibility(query, _, _), do: query
618 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
621 |> Map.put(:user, reading_user)
622 |> Map.put(:actor_id, user.ap_id)
625 godmode: params[:godmode],
626 reading_user: reading_user
628 |> user_activities_recipients()
629 |> fetch_activities(params)
633 def fetch_user_activities(user, reading_user, params \\ %{})
635 def fetch_user_activities(user, reading_user, %{total: true} = params) do
636 result = fetch_activities_for_user(user, reading_user, params)
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
641 def fetch_user_activities(user, reading_user, params) do
643 |> fetch_activities_for_user(reading_user, params)
647 defp fetch_activities_for_user(user, reading_user, params) do
650 |> Map.put(:type, ["Create", "Announce"])
651 |> Map.put(:user, reading_user)
652 |> Map.put(:actor_id, user.ap_id)
653 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
656 if User.blocks?(reading_user, user) do
660 |> Map.put(:blocking_user, reading_user)
661 |> Map.put(:muting_user, reading_user)
664 pagination_type = Map.get(params, :pagination_type) || :keyset
667 godmode: params[:godmode],
668 reading_user: reading_user
670 |> user_activities_recipients()
671 |> fetch_activities(params, pagination_type)
674 def fetch_statuses(reading_user, %{total: true} = params) do
675 result = fetch_activities_for_reading_user(reading_user, params)
676 Keyword.put(result, :items, Enum.reverse(result[:items]))
679 def fetch_statuses(reading_user, params) do
681 |> fetch_activities_for_reading_user(params)
685 defp fetch_activities_for_reading_user(reading_user, params) do
686 params = Map.put(params, :type, ["Create", "Announce"])
689 godmode: params[:godmode],
690 reading_user: reading_user
692 |> user_activities_recipients()
693 |> fetch_activities(params, :offset)
696 defp user_activities_recipients(%{godmode: true}), do: []
698 defp user_activities_recipients(%{reading_user: reading_user}) do
700 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
702 [Constants.as_public()]
706 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
707 raise "Can't use the child object without preloading!"
710 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
712 [activity, object] in query,
715 "?->>'type' != ? or ?->>'actor' != ?",
724 defp restrict_announce_object_actor(query, _), do: query
726 defp restrict_since(query, %{since_id: ""}), do: query
728 defp restrict_since(query, %{since_id: since_id}) do
729 from(activity in query, where: activity.id > ^since_id)
732 defp restrict_since(query, _), do: query
734 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
735 raise_on_missing_preload()
738 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
740 [_activity, object] in query,
741 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
745 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
746 restrict_embedded_tag_any(query, %{tag: tag})
749 defp restrict_embedded_tag_all(query, _), do: query
751 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
752 raise_on_missing_preload()
755 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
757 [_activity, object] in query,
758 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
762 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
763 restrict_embedded_tag_any(query, %{tag: [tag]})
766 defp restrict_embedded_tag_any(query, _), do: query
768 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
769 raise_on_missing_preload()
772 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
774 [_activity, object] in query,
775 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
779 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
780 when is_binary(tag_reject) do
781 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
784 defp restrict_embedded_tag_reject_any(query, _), do: query
786 defp object_ids_query_for_tags(tags) do
787 from(hto in "hashtags_objects")
788 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
789 |> where([hto, ht], ht.name in ^tags)
790 |> select([hto], hto.object_id)
791 |> distinct([hto], true)
794 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
795 raise_on_missing_preload()
798 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
799 restrict_hashtag_any(query, %{tag: single_tag})
802 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
804 [_activity, object] in query,
808 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
809 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
810 AND hashtags_objects.object_id = ?) @> ?
819 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
820 restrict_hashtag_all(query, %{tag_all: [tag]})
823 defp restrict_hashtag_all(query, _), do: query
825 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
826 raise_on_missing_preload()
829 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
831 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
834 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
836 [_activity, object] in query,
837 join: hto in "hashtags_objects",
838 on: hto.object_id == object.id,
839 where: hto.hashtag_id in ^hashtag_ids,
840 distinct: [desc: object.id],
841 order_by: [desc: object.id]
845 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
846 restrict_hashtag_any(query, %{tag: [tag]})
849 defp restrict_hashtag_any(query, _), do: query
851 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
852 raise_on_missing_preload()
855 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
857 [_activity, object] in query,
858 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
862 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
863 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
866 defp restrict_hashtag_reject_any(query, _), do: query
868 defp raise_on_missing_preload do
869 raise "Can't use the child object without preloading!"
872 defp restrict_recipients(query, [], _user), do: query
874 defp restrict_recipients(query, recipients, nil) do
875 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
878 defp restrict_recipients(query, recipients, user) do
881 where: fragment("? && ?", ^recipients, activity.recipients),
882 or_where: activity.actor == ^user.ap_id
886 defp restrict_local(query, %{local_only: true}) do
887 from(activity in query, where: activity.local == true)
890 defp restrict_local(query, _), do: query
892 defp restrict_remote(query, %{remote: true}) do
893 from(activity in query, where: activity.local == false)
896 defp restrict_remote(query, _), do: query
898 defp restrict_actor(query, %{actor_id: actor_id}) do
899 from(activity in query, where: activity.actor == ^actor_id)
902 defp restrict_actor(query, _), do: query
904 defp restrict_type(query, %{type: type}) when is_binary(type) do
905 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
908 defp restrict_type(query, %{type: type}) do
909 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
912 defp restrict_type(query, _), do: query
914 defp restrict_state(query, %{state: state}) do
915 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
918 defp restrict_state(query, _), do: query
920 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
922 [_activity, object] in query,
923 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
927 defp restrict_favorited_by(query, _), do: query
929 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
930 raise "Can't use the child object without preloading!"
933 defp restrict_media(query, %{only_media: true}) do
935 [activity, object] in query,
936 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
937 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
941 defp restrict_media(query, _), do: query
943 defp restrict_replies(query, %{exclude_replies: true}) do
945 [_activity, object] in query,
946 where: fragment("?->>'inReplyTo' is null", object.data)
950 defp restrict_replies(query, %{
951 reply_filtering_user: %User{} = user,
952 reply_visibility: "self"
955 [activity, object] in query,
958 "?->>'inReplyTo' is null OR ? = ANY(?)",
966 defp restrict_replies(query, %{
967 reply_filtering_user: %User{} = user,
968 reply_visibility: "following"
971 [activity, object] in query,
975 ?->>'type' != 'Create' -- This isn't a Create
976 OR ?->>'inReplyTo' is null -- this isn't a reply
977 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
978 -- unless they are the author (because authors
979 -- are also part of the recipients). This leads
980 -- to a bug that self-replies by friends won't
982 OR ? = ? -- The actor is us
986 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
995 defp restrict_replies(query, _), do: query
997 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
998 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1001 defp restrict_reblogs(query, _), do: query
1003 defp restrict_muted(query, %{with_muted: true}), do: query
1005 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1006 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1009 from([activity] in query,
1010 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1013 "not (?->'to' \\?| ?) or ? = ?",
1021 unless opts[:skip_preload] do
1022 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1028 defp restrict_muted(query, _), do: query
1030 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1031 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1032 domain_blocks = user.domain_blocks || []
1034 following_ap_ids = User.get_friends_ap_ids(user)
1037 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1040 [activity, object: o] in query,
1041 # You don't block the author
1042 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1044 # You don't block any recipients, and didn't author the post
1047 "((not (? && ?)) or ? = ?)",
1048 activity.recipients,
1054 # You don't block the domain of any recipients, and didn't author the post
1057 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1058 activity.recipients,
1064 # It's not a boost of a user you block
1067 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1073 # You don't block the author's domain, and also don't follow the author
1076 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1083 # Same as above, but checks the Object
1086 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1095 defp restrict_blocked(query, _), do: query
1097 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1098 if Config.get([:activitypub, :blockers_visible]) == true do
1101 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1105 # The author doesn't block you
1106 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1108 # It's not a boost of a user that blocks you
1111 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1120 defp restrict_blockers_visibility(query, _), do: query
1122 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1127 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1129 ^[Constants.as_public()]
1134 defp restrict_unlisted(query, _), do: query
1136 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1138 [activity, object: o] in query,
1141 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1150 defp restrict_pinned(query, _), do: query
1152 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1153 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1159 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1167 defp restrict_muted_reblogs(query, _), do: query
1169 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1172 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1176 defp restrict_instance(query, _), do: query
1178 defp restrict_filtered(query, %{user: %User{} = user}) do
1179 case Filter.compose_regex(user) do
1184 from([activity, object] in query,
1186 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1187 activity.actor == ^user.ap_id
1192 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1193 restrict_filtered(query, %{user: user})
1196 defp restrict_filtered(query, _), do: query
1198 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1200 defp exclude_poll_votes(query, _) do
1201 if has_named_binding?(query, :object) do
1202 from([activity, object: o] in query,
1203 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1210 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1212 defp exclude_chat_messages(query, _) do
1213 if has_named_binding?(query, :object) do
1214 from([activity, object: o] in query,
1215 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1222 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1224 defp exclude_invisible_actors(query, _opts) do
1226 User.Query.build(%{invisible: true, select: [:ap_id]})
1228 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1230 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1233 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1234 from(activity in query, where: activity.id != ^id)
1237 defp exclude_id(query, _), do: query
1239 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1241 defp maybe_preload_objects(query, _) do
1243 |> Activity.with_preloaded_object()
1246 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1248 defp maybe_preload_bookmarks(query, opts) do
1250 |> Activity.with_preloaded_bookmark(opts[:user])
1253 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1255 |> Activity.with_preloaded_report_notes()
1258 defp maybe_preload_report_notes(query, _), do: query
1260 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1262 defp maybe_set_thread_muted_field(query, opts) do
1264 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1267 defp maybe_order(query, %{order: :desc}) do
1269 |> order_by(desc: :id)
1272 defp maybe_order(query, %{order: :asc}) do
1274 |> order_by(asc: :id)
1277 defp maybe_order(query, _), do: query
1279 defp normalize_fetch_activities_query_opts(opts) do
1280 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1282 value when is_bitstring(value) ->
1283 Map.put(opts, key, Hashtag.normalize_name(value))
1285 value when is_list(value) ->
1288 |> Enum.map(&Hashtag.normalize_name/1)
1291 Map.put(opts, key, normalized_value)
1299 defp fetch_activities_query_ap_ids_ops(opts) do
1300 source_user = opts[:muting_user]
1301 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1303 ap_id_relationships =
1304 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1305 [:block | ap_id_relationships]
1310 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1312 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1313 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1315 restrict_muted_reblogs_opts =
1316 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1318 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1321 def fetch_activities_query_secret(recipients, opts \\ %{}) do
1322 opts = normalize_fetch_activities_query_opts(opts)
1324 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1325 fetch_activities_query_ap_ids_ops(opts)
1328 skip_thread_containment: true
1333 |> maybe_preload_objects(opts)
1334 |> maybe_preload_bookmarks(opts)
1335 |> maybe_preload_report_notes(opts)
1336 |> maybe_set_thread_muted_field(opts)
1337 |> maybe_order(opts)
1338 |> restrict_recipients(recipients, opts[:user])
1339 |> restrict_replies(opts)
1340 |> restrict_since(opts)
1341 |> restrict_local(opts)
1342 |> restrict_remote(opts)
1343 |> restrict_actor(opts)
1344 |> restrict_type(opts)
1345 |> restrict_state(opts)
1346 |> restrict_favorited_by(opts)
1347 |> restrict_blocked(restrict_blocked_opts)
1348 |> restrict_blockers_visibility(opts)
1349 |> restrict_muted(restrict_muted_opts)
1350 |> restrict_filtered(opts)
1351 |> restrict_media(opts)
1352 |> restrict_visibility(opts)
1353 |> restrict_thread_visibility(opts, config)
1354 |> restrict_reblogs(opts)
1355 |> restrict_pinned(opts)
1356 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1357 |> restrict_instance(opts)
1358 |> restrict_announce_object_actor(opts)
1359 |> restrict_filtered(opts)
1360 |> exclude_poll_votes(opts)
1361 |> exclude_chat_messages(opts)
1362 |> exclude_invisible_actors(opts)
1363 |> exclude_visibility(opts)
1365 if Config.feature_enabled?(:improved_hashtag_timeline) do
1367 |> restrict_hashtag_any(opts)
1368 |> restrict_hashtag_all(opts)
1369 |> restrict_hashtag_reject_any(opts)
1372 |> restrict_embedded_tag_any(opts)
1373 |> restrict_embedded_tag_all(opts)
1374 |> restrict_embedded_tag_reject_any(opts)
1378 def fetch_activities_query(recipients, opts \\ %{}) do
1379 opts = normalize_fetch_activities_query_opts(opts)
1381 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1382 fetch_activities_query_ap_ids_ops(opts)
1385 skip_thread_containment: true
1390 |> maybe_preload_objects(opts)
1391 |> maybe_preload_bookmarks(opts)
1392 |> maybe_preload_report_notes(opts)
1393 |> maybe_set_thread_muted_field(opts)
1394 |> maybe_order(opts)
1395 |> restrict_recipients(recipients, opts[:user])
1396 |> restrict_replies(opts)
1397 |> restrict_since(opts)
1398 |> restrict_local(opts)
1399 |> restrict_remote(opts)
1400 |> restrict_actor(opts)
1401 |> restrict_type(opts)
1402 |> restrict_state(opts)
1403 |> restrict_favorited_by(opts)
1404 |> restrict_blocked(restrict_blocked_opts)
1405 |> restrict_blockers_visibility(opts)
1406 |> restrict_muted(restrict_muted_opts)
1407 |> restrict_filtered(opts)
1408 |> restrict_media(opts)
1409 |> restrict_visibility(opts)
1410 |> restrict_thread_visibility(opts, config)
1411 |> restrict_reblogs(opts)
1412 |> restrict_pinned(opts)
1413 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1414 |> restrict_instance(opts)
1415 |> restrict_announce_object_actor(opts)
1416 |> restrict_filtered(opts)
1417 |> exclude_poll_votes(opts)
1418 |> exclude_chat_messages(opts)
1419 |> exclude_invisible_actors(opts)
1420 |> exclude_visibility(opts)
1422 if Config.feature_enabled?(:improved_hashtag_timeline) do
1424 |> restrict_hashtag_any(opts)
1425 |> restrict_hashtag_all(opts)
1426 |> restrict_hashtag_reject_any(opts)
1429 |> restrict_embedded_tag_any(opts)
1430 |> restrict_embedded_tag_all(opts)
1431 |> restrict_embedded_tag_reject_any(opts)
1436 Fetch favorites activities of user with order by sort adds to favorites
1438 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1439 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1441 |> Activity.Queries.by_actor()
1442 |> Activity.Queries.by_type("Like")
1443 |> Activity.with_joined_object()
1444 |> Object.with_joined_activity()
1445 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1446 |> order_by([like, _, _], desc_nulls_last: like.id)
1447 |> Pagination.fetch_paginated(
1448 Map.merge(params, %{skip_order: true}),
1453 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1454 Enum.map(activities, fn
1455 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1456 if Enum.any?(bcc, &(&1 in list_memberships)) do
1457 update_in(activity.data["cc"], &[user_ap_id | &1])
1467 defp maybe_update_cc(activities, _, _), do: activities
1469 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1470 from(activity in query,
1472 fragment("? && ?", activity.recipients, ^recipients) or
1473 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1474 ^Constants.as_public() in activity.recipients)
1478 def fetch_activities_bounded(
1480 recipients_with_public,
1482 pagination \\ :keyset
1484 fetch_activities_query([], opts)
1485 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1486 |> Pagination.fetch_paginated(opts, pagination)
1490 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1491 def upload(file, opts \\ []) do
1492 with {:ok, data} <- Upload.store(file, opts) do
1493 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1495 Repo.insert(%Object{data: obj_data})
1499 @spec get_actor_url(any()) :: binary() | nil
1500 defp get_actor_url(url) when is_binary(url), do: url
1501 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1503 defp get_actor_url(url) when is_list(url) do
1509 defp get_actor_url(_url), do: nil
1511 defp normalize_image(%{"url" => url}) do
1514 "url" => [%{"href" => url}]
1518 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1519 defp normalize_image(_), do: nil
1521 defp object_to_user_data(data) do
1524 |> Map.get("attachment", [])
1525 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1526 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1530 |> Map.get("tag", [])
1532 %{"type" => "Emoji"} -> true
1535 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1536 {String.trim(name, ":"), url}
1539 is_locked = data["manuallyApprovesFollowers"] || false
1540 capabilities = data["capabilities"] || %{}
1541 accepts_chat_messages = capabilities["acceptsChatMessages"]
1542 data = Transmogrifier.maybe_fix_user_object(data)
1543 is_discoverable = data["discoverable"] || false
1544 invisible = data["invisible"] || false
1545 actor_type = data["type"] || "Person"
1547 featured_address = data["featured"]
1548 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1551 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1552 data["publicKey"]["publicKeyPem"]
1558 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1559 data["endpoints"]["sharedInbox"]
1566 uri: get_actor_url(data["url"]),
1568 banner: normalize_image(data["image"]),
1571 is_locked: is_locked,
1572 is_discoverable: is_discoverable,
1573 invisible: invisible,
1574 avatar: normalize_image(data["icon"]),
1576 follower_address: data["followers"],
1577 following_address: data["following"],
1578 featured_address: featured_address,
1579 bio: data["summary"] || "",
1580 actor_type: actor_type,
1581 also_known_as: Map.get(data, "alsoKnownAs", []),
1582 public_key: public_key,
1583 inbox: data["inbox"],
1584 shared_inbox: shared_inbox,
1585 accepts_chat_messages: accepts_chat_messages,
1586 pinned_objects: pinned_objects
1589 # nickname can be nil because of virtual actors
1590 if data["preferredUsername"] do
1594 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1597 Map.put(user_data, :nickname, nil)
1601 def fetch_follow_information_for_user(user) do
1602 with {:ok, following_data} <-
1603 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1604 {:ok, hide_follows} <- collection_private(following_data),
1605 {:ok, followers_data} <-
1606 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1607 {:ok, hide_followers} <- collection_private(followers_data) do
1610 hide_follows: hide_follows,
1611 follower_count: normalize_counter(followers_data["totalItems"]),
1612 following_count: normalize_counter(following_data["totalItems"]),
1613 hide_followers: hide_followers
1616 {:error, _} = e -> e
1621 defp normalize_counter(counter) when is_integer(counter), do: counter
1622 defp normalize_counter(_), do: 0
1624 def maybe_update_follow_information(user_data) do
1625 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1626 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1628 {:collections_available,
1629 !!(user_data[:following_address] && user_data[:follower_address])},
1631 fetch_follow_information_for_user(user_data) do
1632 info = Map.merge(user_data[:info] || %{}, info)
1635 |> Map.put(:info, info)
1637 {:user_type_check, false} ->
1640 {:collections_available, false} ->
1643 {:enabled, false} ->
1648 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1655 defp collection_private(%{"first" => %{"type" => type}})
1656 when type in ["CollectionPage", "OrderedCollectionPage"],
1659 defp collection_private(%{"first" => first}) do
1660 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1661 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1664 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1665 {:error, _} = e -> e
1670 defp collection_private(_data), do: {:ok, true}
1672 def user_data_from_user_object(data) do
1673 with {:ok, data} <- MRF.filter(data) do
1674 {:ok, object_to_user_data(data)}
1680 def fetch_and_prepare_user_from_ap_id(ap_id) do
1681 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1682 {:ok, data} <- user_data_from_user_object(data) do
1683 {:ok, maybe_update_follow_information(data)}
1685 # If this has been deleted, only log a debug and not an error
1686 {:error, "Object has been deleted" = e} ->
1687 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1690 {:error, {:reject, reason} = e} ->
1691 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1695 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1700 def maybe_handle_clashing_nickname(data) do
1701 with nickname when is_binary(nickname) <- data[:nickname],
1702 %User{} = old_user <- User.get_by_nickname(nickname),
1703 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1705 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1709 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1710 |> User.update_and_set_cache()
1712 {:ap_id_comparison, true} ->
1714 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1722 def pin_data_from_featured_collection(%{
1724 "orderedItems" => objects
1726 when type in ["OrderedCollection", "Collection"] do
1727 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1730 def fetch_and_prepare_featured_from_ap_id(nil) do
1734 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1735 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1736 {:ok, pin_data_from_featured_collection(data)}
1739 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1744 def pinned_fetch_task(nil), do: nil
1746 def pinned_fetch_task(%{pinned_objects: pins}) do
1747 if Enum.all?(pins, fn {ap_id, _} ->
1748 Object.get_cached_by_ap_id(ap_id) ||
1749 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1757 def make_user_from_ap_id(ap_id) do
1758 user = User.get_cached_by_ap_id(ap_id)
1760 if user && !User.ap_enabled?(user) do
1761 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1763 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1764 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1768 |> User.remote_user_changeset(data)
1769 |> User.update_and_set_cache()
1771 maybe_handle_clashing_nickname(data)
1774 |> User.remote_user_changeset()
1782 def make_user_from_nickname(nickname) do
1783 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1784 make_user_from_ap_id(ap_id)
1786 _e -> {:error, "No AP id in WebFinger"}
1790 # filter out broken threads
1791 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1792 entire_thread_visible_for_user?(activity, user)
1795 # do post-processing on a specific activity
1796 def contain_activity(%Activity{} = activity, %User{} = user) do
1797 contain_broken_threads(activity, user)
1800 def fetch_direct_messages_query do
1802 |> restrict_type(%{type: "Create"})
1803 |> restrict_visibility(%{visibility: "direct"})
1804 |> order_by([activity], asc: activity.id)