1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
8 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
16 alias Pleroma.Notification
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
36 require Pleroma.Constants
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
68 defp check_actor_can_insert(_), do: true
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
75 defp check_remote_limit(_), do: true
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
98 defp increase_replies_count_if_reply(_create_data), do: :noop
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
113 Repo.insert(%Activity{
116 recipients: recipients,
117 actor: object["actor"]
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
149 %Activity{} = activity ->
155 {:containment, _} = error ->
158 {:error, _} = error ->
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
166 recipients: recipients,
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
181 defp insert_activity_with_expiration(data, local, recipients) do
185 actor: data["actor"],
186 recipients: recipients
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
199 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
200 Activity.get_create_by_object_ap_id_with_object(id)
206 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
207 participations = get_participations(conversation)
209 stream_out_participations(participations)
212 defp maybe_create_activity_expiration(
213 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
216 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
217 activity_id: activity.id,
218 expires_at: expires_at
224 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
226 defp create_or_bump_conversation(activity, actor) do
227 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
228 %User{} = user <- User.get_cached_by_ap_id(actor) do
229 Participation.mark_as_read(user, conversation)
234 defp get_participations({:ok, conversation}) do
236 |> Repo.preload(:participations, force: true)
237 |> Map.get(:participations)
240 defp get_participations(_), do: []
242 def stream_out_participations(participations) do
245 |> Repo.preload(:user)
247 Streamer.stream("participation", participations)
251 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
252 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
253 conversation = Repo.preload(conversation, :participations)
256 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
261 if last_activity_id do
262 stream_out_participations(conversation.participations)
268 def stream_out_participations(_, _), do: :noop
271 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
272 when data_type in ["Create", "Announce", "Delete", "Update"] do
274 |> Topics.get_activity_topics()
275 |> Streamer.stream(activity)
279 def stream_out(_activity) do
283 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
284 def create(params, fake \\ false) do
285 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
290 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
291 additional = params[:additional] || %{}
292 # only accept false as false value
293 local = !(params[:local] == false)
294 published = params[:published]
295 quick_insert? = Config.get([:env]) == :benchmark
299 %{to: to, actor: actor, published: published, context: context, object: object},
303 with {:ok, activity} <- insert(create_data, local, fake),
304 {:fake, false, activity} <- {:fake, fake, activity},
305 _ <- increase_replies_count_if_reply(create_data),
306 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
307 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
308 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
309 _ <- notify_and_stream(activity),
310 :ok <- maybe_schedule_poll_notifications(activity),
311 :ok <- maybe_federate(activity) do
314 {:quick_insert, true, activity} ->
317 {:fake, true, activity} ->
321 Repo.rollback(message)
325 defp maybe_schedule_poll_notifications(activity) do
326 PollWorker.schedule_poll_end(activity)
330 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
331 {:ok, Activity.t()} | nil | {:error, any()}
332 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
333 with {:ok, result} <-
334 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
339 defp do_unfollow(follower, followed, activity_id, local)
341 defp do_unfollow(follower, followed, activity_id, local) when local == true do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
344 {:ok, activity} <- insert(unfollow_data, local),
345 {:ok, _activity} <- Repo.delete(follow_activity),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
351 {:error, error} -> Repo.rollback(error)
355 defp do_unfollow(follower, followed, activity_id, false) do
356 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
357 # uses deterministic ids for follows.
358 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
359 {:ok, _activity} <- Repo.delete(follow_activity),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
362 _ <- notify_and_stream(unfollow_activity) do
363 {:ok, unfollow_activity}
366 {:error, error} -> Repo.rollback(error)
370 defp make_unfollow_activity(data, local) do
371 {recipients, _, _} = get_recipients(data)
376 actor: data["actor"],
377 recipients: recipients
381 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
383 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
397 # only accept false as false value
398 local = !(params[:local] == false)
399 forward = !(params[:forward] == false)
401 additional = params[:additional] || %{}
405 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
407 Map.merge(additional, %{"to" => [], "cc" => []})
410 with flag_data <- make_flag_data(params, additional),
411 {:ok, activity} <- insert(flag_data, local),
412 {:ok, stripped_activity} <- strip_report_status_data(activity),
413 _ <- notify_and_stream(activity),
415 maybe_federate(stripped_activity) do
416 User.all_superusers()
417 |> Enum.filter(fn user -> user.ap_id != actor end)
418 |> Enum.filter(fn user -> not is_nil(user.email) end)
419 |> Enum.each(fn superuser ->
421 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
422 |> Pleroma.Emails.Mailer.deliver_async()
427 {:error, error} -> Repo.rollback(error)
431 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
432 def move(%User{} = origin, %User{} = target, local \\ true) do
435 "actor" => origin.ap_id,
436 "object" => origin.ap_id,
437 "target" => target.ap_id,
438 "to" => [origin.follower_address]
441 with true <- origin.ap_id in target.also_known_as,
442 {:ok, activity} <- insert(params, local),
443 _ <- notify_and_stream(activity) do
444 maybe_federate(activity)
446 BackgroundWorker.enqueue("move_following", %{
447 "origin_id" => origin.id,
448 "target_id" => target.id
453 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
458 def fetch_activities_for_context_query(context, opts) do
459 public = [Constants.as_public()]
463 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
466 from(activity in Activity)
467 |> maybe_preload_objects(opts)
468 |> maybe_preload_bookmarks(opts)
469 |> maybe_set_thread_muted_field(opts)
470 |> restrict_blocked(opts)
471 |> restrict_blockers_visibility(opts)
472 |> restrict_recipients(recipients, opts[:user])
473 |> restrict_filtered(opts)
477 "?->>'type' = ? and ?->>'context' = ?",
484 |> exclude_poll_votes(opts)
486 |> order_by([activity], desc: activity.id)
489 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
490 def fetch_activities_for_context(context, opts \\ %{}) do
492 |> fetch_activities_for_context_query(opts)
496 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
497 FlakeId.Ecto.CompatType.t() | nil
498 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
500 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
501 |> restrict_visibility(%{visibility: "direct"})
507 defp fetch_paginated_optimized(query, opts, pagination) do
508 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
509 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
510 IO.inspect(Repo.to_sql(:all, query))
511 opts = Map.put(opts, :skip_extra_order, true)
513 Pagination.fetch_paginated(query, opts, pagination)
516 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
517 list_memberships = Pleroma.List.memberships(opts[:user])
519 fetch_activities_query(recipients ++ list_memberships, opts)
520 |> fetch_paginated_optimized(opts, pagination)
522 |> maybe_update_cc(list_memberships, opts[:user])
525 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
526 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
527 includes_local_public = Map.get(opts, :includes_local_public, false)
529 opts = Map.delete(opts, :user)
531 intended_recipients =
532 if includes_local_public do
533 [Constants.as_public(), as_local_public()]
535 [Constants.as_public()]
539 |> fetch_activities_query(opts)
540 |> restrict_unlisted(opts)
541 |> fetch_paginated_optimized(opts, pagination)
544 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
545 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
547 |> Map.put(:restrict_unlisted, true)
548 |> fetch_public_or_unlisted_activities(pagination)
551 @valid_visibilities ~w[direct unlisted public private]
553 defp restrict_visibility(query, %{visibility: visibility})
554 when is_list(visibility) do
555 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
560 "activity_visibility(?, ?, ?) = ANY (?)",
568 Logger.error("Could not restrict visibility to #{visibility}")
572 defp restrict_visibility(query, %{visibility: visibility})
573 when visibility in @valid_visibilities do
577 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
581 defp restrict_visibility(_query, %{visibility: visibility})
582 when visibility not in @valid_visibilities do
583 Logger.error("Could not restrict visibility to #{visibility}")
586 defp restrict_visibility(query, _visibility), do: query
588 defp exclude_visibility(query, %{exclude_visibilities: visibility})
589 when is_list(visibility) do
590 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
595 "activity_visibility(?, ?, ?) = ANY (?)",
603 Logger.error("Could not exclude visibility to #{visibility}")
608 defp exclude_visibility(query, %{exclude_visibilities: visibility})
609 when visibility in @valid_visibilities do
614 "activity_visibility(?, ?, ?) = ?",
623 defp exclude_visibility(query, %{exclude_visibilities: visibility})
624 when visibility not in [nil | @valid_visibilities] do
625 Logger.error("Could not exclude visibility to #{visibility}")
629 defp exclude_visibility(query, _visibility), do: query
631 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
634 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
637 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
638 local_public = as_local_public()
642 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
646 defp restrict_thread_visibility(query, _, _), do: query
648 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
651 |> Map.put(:user, reading_user)
652 |> Map.put(:actor_id, user.ap_id)
655 godmode: params[:godmode],
656 reading_user: reading_user
658 |> user_activities_recipients()
659 |> fetch_activities(params)
663 def fetch_user_activities(user, reading_user, params \\ %{})
665 def fetch_user_activities(user, reading_user, %{total: true} = params) do
666 result = fetch_activities_for_user(user, reading_user, params)
668 Keyword.put(result, :items, Enum.reverse(result[:items]))
671 def fetch_user_activities(user, reading_user, params) do
673 |> fetch_activities_for_user(reading_user, params)
677 defp fetch_activities_for_user(user, reading_user, params) do
680 |> Map.put(:type, ["Create", "Announce"])
681 |> Map.put(:user, reading_user)
682 |> Map.put(:actor_id, user.ap_id)
683 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
686 if User.blocks?(reading_user, user) do
690 |> Map.put(:blocking_user, reading_user)
691 |> Map.put(:muting_user, reading_user)
694 pagination_type = Map.get(params, :pagination_type) || :keyset
697 godmode: params[:godmode],
698 reading_user: reading_user
700 |> user_activities_recipients()
701 |> fetch_activities(params, pagination_type)
704 def fetch_statuses(reading_user, %{total: true} = params) do
705 result = fetch_activities_for_reading_user(reading_user, params)
706 Keyword.put(result, :items, Enum.reverse(result[:items]))
709 def fetch_statuses(reading_user, params) do
711 |> fetch_activities_for_reading_user(params)
715 defp fetch_activities_for_reading_user(reading_user, params) do
716 params = Map.put(params, :type, ["Create", "Announce"])
719 godmode: params[:godmode],
720 reading_user: reading_user
722 |> user_activities_recipients()
723 |> fetch_activities(params, :offset)
726 defp user_activities_recipients(%{godmode: true}), do: []
728 defp user_activities_recipients(%{reading_user: reading_user}) do
729 if not is_nil(reading_user) and reading_user.local do
731 Constants.as_public(),
733 reading_user.ap_id | User.following(reading_user)
736 [Constants.as_public()]
740 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
741 raise "Can't use the child object without preloading!"
744 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
746 [activity, object] in query,
749 "?->>'type' != ? or ?->>'actor' != ?",
758 defp restrict_announce_object_actor(query, _), do: query
760 defp restrict_since(query, %{since_id: ""}), do: query
762 defp restrict_since(query, %{since_id: since_id}) do
763 from(activity in query, where: activity.id > ^since_id)
766 defp restrict_since(query, _), do: query
768 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
769 raise_on_missing_preload()
772 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
774 [_activity, object] in query,
775 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
779 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
780 restrict_embedded_tag_any(query, %{tag: tag})
783 defp restrict_embedded_tag_all(query, _), do: query
785 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
786 raise_on_missing_preload()
789 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
791 [_activity, object] in query,
792 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
796 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
797 restrict_embedded_tag_any(query, %{tag: [tag]})
800 defp restrict_embedded_tag_any(query, _), do: query
802 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
803 raise_on_missing_preload()
806 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
808 [_activity, object] in query,
809 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
813 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
814 when is_binary(tag_reject) do
815 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
818 defp restrict_embedded_tag_reject_any(query, _), do: query
820 defp object_ids_query_for_tags(tags) do
821 from(hto in "hashtags_objects")
822 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
823 |> where([hto, ht], ht.name in ^tags)
824 |> select([hto], hto.object_id)
825 |> distinct([hto], true)
828 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
829 raise_on_missing_preload()
832 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
833 restrict_hashtag_any(query, %{tag: single_tag})
836 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
838 [_activity, object] in query,
842 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
843 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
844 AND hashtags_objects.object_id = ?) @> ?
853 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
854 restrict_hashtag_all(query, %{tag_all: [tag]})
857 defp restrict_hashtag_all(query, _), do: query
859 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
860 raise_on_missing_preload()
863 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
865 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
868 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
870 [_activity, object] in query,
871 join: hto in "hashtags_objects",
872 on: hto.object_id == object.id,
873 where: hto.hashtag_id in ^hashtag_ids,
874 distinct: [desc: object.id],
875 order_by: [desc: object.id]
879 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
880 restrict_hashtag_any(query, %{tag: [tag]})
883 defp restrict_hashtag_any(query, _), do: query
885 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
886 raise_on_missing_preload()
889 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
891 [_activity, object] in query,
892 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
896 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
897 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
900 defp restrict_hashtag_reject_any(query, _), do: query
902 defp raise_on_missing_preload do
903 raise "Can't use the child object without preloading!"
906 defp restrict_recipients(query, [], _user), do: query
908 defp restrict_recipients(query, recipients, nil) do
909 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
912 defp restrict_recipients(query, recipients, user) do
915 where: fragment("? && ?", ^recipients, activity.recipients),
916 or_where: activity.actor == ^user.ap_id
920 defp restrict_local(query, %{local_only: true}) do
921 from(activity in query, where: activity.local == true)
924 defp restrict_local(query, _), do: query
926 defp restrict_remote(query, %{remote: true}) do
927 from(activity in query, where: activity.local == false)
930 defp restrict_remote(query, _), do: query
932 defp restrict_actor(query, %{actor_id: actor_id}) do
933 from(activity in query, where: activity.actor == ^actor_id)
936 defp restrict_actor(query, _), do: query
938 defp restrict_type(query, %{type: type}) when is_binary(type) do
939 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
942 defp restrict_type(query, %{type: type}) do
943 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
946 defp restrict_type(query, _), do: query
948 defp restrict_state(query, %{state: state}) do
949 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
952 defp restrict_state(query, _), do: query
954 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
956 [_activity, object] in query,
957 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
961 defp restrict_favorited_by(query, _), do: query
963 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
964 raise "Can't use the child object without preloading!"
967 defp restrict_media(query, %{only_media: true}) do
969 [activity, object] in query,
970 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
971 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
975 defp restrict_media(query, _), do: query
977 defp restrict_replies(query, %{exclude_replies: true}) do
979 [_activity, object] in query,
980 where: fragment("?->>'inReplyTo' is null", object.data)
984 defp restrict_replies(query, %{
985 reply_filtering_user: %User{} = user,
986 reply_visibility: "self"
989 [activity, object] in query,
992 "?->>'inReplyTo' is null OR ? = ANY(?)",
1000 defp restrict_replies(query, %{
1001 reply_filtering_user: %User{} = user,
1002 reply_visibility: "following"
1005 [activity, object] in query,
1009 ?->>'type' != 'Create' -- This isn't a Create
1010 OR ?->>'inReplyTo' is null -- this isn't a reply
1011 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1012 -- unless they are the author (because authors
1013 -- are also part of the recipients). This leads
1014 -- to a bug that self-replies by friends won't
1016 OR ? = ? -- The actor is us
1020 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1021 activity.recipients,
1029 defp restrict_replies(query, _), do: query
1031 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1032 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1035 defp restrict_reblogs(query, _), do: query
1037 defp restrict_muted(query, %{with_muted: true}), do: query
1039 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1040 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1043 from([activity] in query,
1044 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1047 "not (?->'to' \\?| ?) or ? = ?",
1055 unless opts[:skip_preload] do
1056 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1062 defp restrict_muted(query, _), do: query
1064 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1065 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1066 domain_blocks = user.domain_blocks || []
1068 following_ap_ids = User.get_friends_ap_ids(user)
1071 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1074 [activity, object: o] in query,
1075 # You don't block the author
1076 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1078 # You don't block any recipients, and didn't author the post
1081 "((not (? && ?)) or ? = ?)",
1082 activity.recipients,
1088 # You don't block the domain of any recipients, and didn't author the post
1091 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1092 activity.recipients,
1098 # It's not a boost of a user you block
1101 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1107 # You don't block the author's domain, and also don't follow the author
1110 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1117 # Same as above, but checks the Object
1120 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1129 defp restrict_blocked(query, _), do: query
1131 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1132 if Config.get([:activitypub, :blockers_visible]) == true do
1135 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1139 # The author doesn't block you
1140 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1142 # It's not a boost of a user that blocks you
1145 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1154 defp restrict_blockers_visibility(query, _), do: query
1156 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1161 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1163 ^[Constants.as_public()]
1168 defp restrict_unlisted(query, _), do: query
1170 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1172 [activity, object: o] in query,
1175 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1184 defp restrict_pinned(query, _), do: query
1186 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1187 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1193 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1201 defp restrict_muted_reblogs(query, _), do: query
1203 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1206 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1210 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1213 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1217 defp restrict_instance(query, _), do: query
1219 defp restrict_filtered(query, %{user: %User{} = user}) do
1220 case Filter.compose_regex(user) do
1225 from([activity, object] in query,
1227 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1228 activity.actor == ^user.ap_id
1233 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1234 restrict_filtered(query, %{user: user})
1237 defp restrict_filtered(query, _), do: query
1239 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1241 defp exclude_poll_votes(query, _) do
1242 if has_named_binding?(query, :object) do
1243 from([activity, object: o] in query,
1244 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1251 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1253 defp exclude_invisible_actors(query, _opts) do
1255 User.Query.build(%{invisible: true, select: [:ap_id]})
1257 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1259 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1262 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1263 from(activity in query, where: activity.id != ^id)
1266 defp exclude_id(query, _), do: query
1268 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1270 defp maybe_preload_objects(query, _) do
1272 |> Activity.with_preloaded_object()
1275 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1277 defp maybe_preload_bookmarks(query, opts) do
1279 |> Activity.with_preloaded_bookmark(opts[:user])
1282 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1284 |> Activity.with_preloaded_report_notes()
1287 defp maybe_preload_report_notes(query, _), do: query
1289 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1291 defp maybe_set_thread_muted_field(query, opts) do
1293 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1296 defp maybe_order(query, %{order: :desc}) do
1298 |> order_by(desc: :id)
1301 defp maybe_order(query, %{order: :asc}) do
1303 |> order_by(asc: :id)
1306 defp maybe_order(query, _), do: query
1308 defp normalize_fetch_activities_query_opts(opts) do
1309 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1311 value when is_bitstring(value) ->
1312 Map.put(opts, key, Hashtag.normalize_name(value))
1314 value when is_list(value) ->
1317 |> Enum.map(&Hashtag.normalize_name/1)
1320 Map.put(opts, key, normalized_value)
1328 defp fetch_activities_query_ap_ids_ops(opts) do
1329 source_user = opts[:muting_user]
1330 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1332 ap_id_relationships =
1333 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1334 [:block | ap_id_relationships]
1339 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1341 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1342 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1344 restrict_muted_reblogs_opts =
1345 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1347 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1350 def fetch_activities_query(recipients, opts \\ %{}) do
1351 opts = normalize_fetch_activities_query_opts(opts)
1353 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1354 fetch_activities_query_ap_ids_ops(opts)
1357 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1362 |> maybe_preload_objects(opts)
1363 |> maybe_preload_bookmarks(opts)
1364 |> maybe_preload_report_notes(opts)
1365 |> maybe_set_thread_muted_field(opts)
1366 |> maybe_order(opts)
1367 |> restrict_recipients(recipients, opts[:user])
1368 |> restrict_replies(opts)
1369 |> restrict_since(opts)
1370 |> restrict_local(opts)
1371 |> restrict_remote(opts)
1372 |> restrict_actor(opts)
1373 |> restrict_type(opts)
1374 |> restrict_state(opts)
1375 |> restrict_favorited_by(opts)
1376 |> restrict_blocked(restrict_blocked_opts)
1377 |> restrict_blockers_visibility(opts)
1378 |> restrict_muted(restrict_muted_opts)
1379 |> restrict_filtered(opts)
1380 |> restrict_media(opts)
1381 |> restrict_visibility(opts)
1382 |> restrict_thread_visibility(opts, config)
1383 |> restrict_reblogs(opts)
1384 |> restrict_pinned(opts)
1385 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1386 |> restrict_instance(opts)
1387 |> restrict_announce_object_actor(opts)
1388 |> restrict_filtered(opts)
1389 |> Activity.restrict_deactivated_users()
1390 |> exclude_poll_votes(opts)
1391 |> exclude_invisible_actors(opts)
1392 |> exclude_visibility(opts)
1394 if Config.feature_enabled?(:improved_hashtag_timeline) do
1396 |> restrict_hashtag_any(opts)
1397 |> restrict_hashtag_all(opts)
1398 |> restrict_hashtag_reject_any(opts)
1401 |> restrict_embedded_tag_any(opts)
1402 |> restrict_embedded_tag_all(opts)
1403 |> restrict_embedded_tag_reject_any(opts)
1408 Fetch favorites activities of user with order by sort adds to favorites
1410 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1411 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1413 |> Activity.Queries.by_actor()
1414 |> Activity.Queries.by_type("Like")
1415 |> Activity.with_joined_object()
1416 |> Object.with_joined_activity()
1417 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1418 |> order_by([like, _, _], desc_nulls_last: like.id)
1419 |> Pagination.fetch_paginated(
1420 Map.merge(params, %{skip_order: true}),
1425 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1426 Enum.map(activities, fn
1427 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1428 if Enum.any?(bcc, &(&1 in list_memberships)) do
1429 update_in(activity.data["cc"], &[user_ap_id | &1])
1439 defp maybe_update_cc(activities, _, _), do: activities
1441 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1442 from(activity in query,
1444 fragment("? && ?", activity.recipients, ^recipients) or
1445 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1446 ^Constants.as_public() in activity.recipients)
1450 def fetch_activities_bounded(
1452 recipients_with_public,
1454 pagination \\ :keyset
1456 fetch_activities_query([], opts)
1457 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1458 |> Pagination.fetch_paginated(opts, pagination)
1462 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1463 def upload(file, opts \\ []) do
1464 with {:ok, data} <- Upload.store(file, opts) do
1465 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1467 Repo.insert(%Object{data: obj_data})
1471 @spec get_actor_url(any()) :: binary() | nil
1472 defp get_actor_url(url) when is_binary(url), do: url
1473 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1475 defp get_actor_url(url) when is_list(url) do
1481 defp get_actor_url(_url), do: nil
1483 defp normalize_image(%{"url" => url}) do
1486 "url" => [%{"href" => url}]
1490 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1491 defp normalize_image(_), do: nil
1493 defp object_to_user_data(data, additional) do
1496 |> Map.get("attachment", [])
1497 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1498 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1502 |> Map.get("tag", [])
1504 %{"type" => "Emoji"} -> true
1507 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1508 {String.trim(name, ":"), url}
1511 is_locked = data["manuallyApprovesFollowers"] || false
1512 data = Transmogrifier.maybe_fix_user_object(data)
1513 is_discoverable = data["discoverable"] || false
1514 invisible = data["invisible"] || false
1515 actor_type = data["type"] || "Person"
1517 featured_address = data["featured"]
1518 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1521 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1522 data["publicKey"]["publicKeyPem"]
1526 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1527 data["endpoints"]["sharedInbox"]
1530 # if WebFinger request was already done, we probably have acct, otherwise
1531 # we request WebFinger here
1532 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1536 uri: get_actor_url(data["url"]),
1538 banner: normalize_image(data["image"]),
1541 is_locked: is_locked,
1542 is_discoverable: is_discoverable,
1543 invisible: invisible,
1544 avatar: normalize_image(data["icon"]),
1546 follower_address: data["followers"],
1547 following_address: data["following"],
1548 featured_address: featured_address,
1549 bio: data["summary"] || "",
1550 actor_type: actor_type,
1551 also_known_as: Map.get(data, "alsoKnownAs", []),
1552 public_key: public_key,
1553 inbox: data["inbox"],
1554 shared_inbox: shared_inbox,
1555 pinned_objects: pinned_objects,
1560 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1561 generated = "#{username}@#{URI.parse(data["id"]).host}"
1563 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1564 case WebFinger.finger(generated) do
1565 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1573 # nickname can be nil because of virtual actors
1574 defp generate_nickname(_), do: nil
1576 def fetch_follow_information_for_user(user) do
1577 with {:ok, following_data} <-
1578 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1579 {:ok, hide_follows} <- collection_private(following_data),
1580 {:ok, followers_data} <-
1581 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1582 {:ok, hide_followers} <- collection_private(followers_data) do
1585 hide_follows: hide_follows,
1586 follower_count: normalize_counter(followers_data["totalItems"]),
1587 following_count: normalize_counter(following_data["totalItems"]),
1588 hide_followers: hide_followers
1591 {:error, _} = e -> e
1596 defp normalize_counter(counter) when is_integer(counter), do: counter
1597 defp normalize_counter(_), do: 0
1599 def maybe_update_follow_information(user_data) do
1600 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1601 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1603 {:collections_available,
1604 !!(user_data[:following_address] && user_data[:follower_address])},
1606 fetch_follow_information_for_user(user_data) do
1607 info = Map.merge(user_data[:info] || %{}, info)
1610 |> Map.put(:info, info)
1612 {:user_type_check, false} ->
1615 {:collections_available, false} ->
1618 {:enabled, false} ->
1623 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1630 defp collection_private(%{"first" => %{"type" => type}})
1631 when type in ["CollectionPage", "OrderedCollectionPage"],
1634 defp collection_private(%{"first" => first}) do
1635 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1636 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1639 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1640 {:error, _} = e -> e
1645 defp collection_private(_data), do: {:ok, true}
1647 def user_data_from_user_object(data, additional \\ []) do
1648 with {:ok, data} <- MRF.filter(data) do
1649 {:ok, object_to_user_data(data, additional)}
1655 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1656 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1657 {:ok, data} <- user_data_from_user_object(data, additional) do
1658 {:ok, maybe_update_follow_information(data)}
1660 # If this has been deleted, only log a debug and not an error
1661 {:error, "Object has been deleted" = e} ->
1662 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1665 {:error, {:reject, reason} = e} ->
1666 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1670 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1675 def maybe_handle_clashing_nickname(data) do
1676 with nickname when is_binary(nickname) <- data[:nickname],
1677 %User{} = old_user <- User.get_by_nickname(nickname),
1678 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1680 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1684 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1685 |> User.update_and_set_cache()
1687 {:ap_id_comparison, true} ->
1689 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1697 def pin_data_from_featured_collection(%{
1698 "type" => "OrderedCollection",
1701 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1703 |> Map.get("orderedItems")
1704 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1707 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1712 def pin_data_from_featured_collection(
1717 when type in ["OrderedCollection", "Collection"] do
1718 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1720 # Items can either be a map _or_ a string
1723 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1724 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1728 def fetch_and_prepare_featured_from_ap_id(nil) do
1732 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1733 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1734 {:ok, pin_data_from_featured_collection(data)}
1737 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1742 def pinned_fetch_task(nil), do: nil
1744 def pinned_fetch_task(%{pinned_objects: pins}) do
1745 if Enum.all?(pins, fn {ap_id, _} ->
1746 Object.get_cached_by_ap_id(ap_id) ||
1747 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1755 def make_user_from_ap_id(ap_id, additional \\ []) do
1756 user = User.get_cached_by_ap_id(ap_id)
1758 if user && !User.ap_enabled?(user) do
1759 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1761 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1762 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1766 |> User.remote_user_changeset(data)
1767 |> User.update_and_set_cache()
1769 maybe_handle_clashing_nickname(data)
1772 |> User.remote_user_changeset()
1780 def make_user_from_nickname(nickname) do
1781 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1782 WebFinger.finger(nickname) do
1783 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1785 _e -> {:error, "No AP id in WebFinger"}
1789 # filter out broken threads
1790 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1791 entire_thread_visible_for_user?(activity, user)
1794 # do post-processing on a specific activity
1795 def contain_activity(%Activity{} = activity, %User{} = user) do
1796 contain_broken_threads(activity, user)
1799 def fetch_direct_messages_query do
1801 |> restrict_type(%{type: "Create"})
1802 |> restrict_visibility(%{visibility: "direct"})
1803 |> order_by([activity], asc: activity.id)