1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
8 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
16 alias Pleroma.Notification
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
36 require Pleroma.Constants
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
68 defp check_actor_can_insert(_), do: true
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
75 defp check_remote_limit(_), do: true
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
98 defp increase_replies_count_if_reply(_create_data), do: :noop
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
113 Repo.insert(%Activity{
116 recipients: recipients,
117 actor: object["actor"]
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
149 %Activity{} = activity ->
155 {:containment, _} = error ->
158 {:error, _} = error ->
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
166 recipients: recipients,
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
181 defp insert_activity_with_expiration(data, local, recipients) do
185 actor: data["actor"],
186 recipients: recipients
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
199 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
200 Activity.get_create_by_object_ap_id_with_object(id)
206 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
207 participations = get_participations(conversation)
209 stream_out_participations(participations)
212 defp maybe_create_activity_expiration(
213 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
216 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
217 activity_id: activity.id,
218 expires_at: expires_at
224 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
226 defp create_or_bump_conversation(activity, actor) do
227 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
228 %User{} = user <- User.get_cached_by_ap_id(actor) do
229 Participation.mark_as_read(user, conversation)
234 defp get_participations({:ok, conversation}) do
236 |> Repo.preload(:participations, force: true)
237 |> Map.get(:participations)
240 defp get_participations(_), do: []
242 def stream_out_participations(participations) do
245 |> Repo.preload(:user)
247 Streamer.stream("participation", participations)
251 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
252 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
253 conversation = Repo.preload(conversation, :participations)
256 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
261 if last_activity_id do
262 stream_out_participations(conversation.participations)
268 def stream_out_participations(_, _), do: :noop
271 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
272 when data_type in ["Create", "Announce", "Delete", "Update"] do
274 |> Topics.get_activity_topics()
275 |> Streamer.stream(activity)
279 def stream_out(_activity) do
283 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
284 def create(params, fake \\ false) do
285 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
290 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
291 additional = params[:additional] || %{}
292 # only accept false as false value
293 local = !(params[:local] == false)
294 published = params[:published]
295 quick_insert? = Config.get([:env]) == :benchmark
299 %{to: to, actor: actor, published: published, context: context, object: object},
303 with {:ok, activity} <- insert(create_data, local, fake),
304 {:fake, false, activity} <- {:fake, fake, activity},
305 _ <- increase_replies_count_if_reply(create_data),
306 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
307 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
308 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
309 _ <- notify_and_stream(activity),
310 :ok <- maybe_schedule_poll_notifications(activity),
311 :ok <- maybe_federate(activity) do
314 {:quick_insert, true, activity} ->
317 {:fake, true, activity} ->
321 Repo.rollback(message)
325 defp maybe_schedule_poll_notifications(activity) do
326 PollWorker.schedule_poll_end(activity)
330 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
331 {:ok, Activity.t()} | nil | {:error, any()}
332 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
333 with {:ok, result} <-
334 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
339 defp do_unfollow(follower, followed, activity_id, local)
341 defp do_unfollow(follower, followed, activity_id, local) when local == true do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
344 {:ok, activity} <- insert(unfollow_data, local),
345 {:ok, _activity} <- Repo.delete(follow_activity),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
351 {:error, error} -> Repo.rollback(error)
355 defp do_unfollow(follower, followed, activity_id, false) do
356 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
357 # uses deterministic ids for follows.
358 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
359 {:ok, _activity} <- Repo.delete(follow_activity),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
362 _ <- notify_and_stream(unfollow_activity) do
363 {:ok, unfollow_activity}
366 {:error, error} -> Repo.rollback(error)
370 defp make_unfollow_activity(data, local) do
371 {recipients, _, _} = get_recipients(data)
376 actor: data["actor"],
377 recipients: recipients
381 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
383 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
397 # only accept false as false value
398 local = !(params[:local] == false)
399 forward = !(params[:forward] == false)
401 additional = params[:additional] || %{}
405 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
407 Map.merge(additional, %{"to" => [], "cc" => []})
410 with flag_data <- make_flag_data(params, additional),
411 {:ok, activity} <- insert(flag_data, local),
412 {:ok, stripped_activity} <- strip_report_status_data(activity),
413 _ <- notify_and_stream(activity),
415 maybe_federate(stripped_activity) do
416 User.all_superusers()
417 |> Enum.filter(fn user -> user.ap_id != actor end)
418 |> Enum.filter(fn user -> not is_nil(user.email) end)
419 |> Enum.each(fn superuser ->
421 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
422 |> Pleroma.Emails.Mailer.deliver_async()
427 {:error, error} -> Repo.rollback(error)
431 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
432 def move(%User{} = origin, %User{} = target, local \\ true) do
435 "actor" => origin.ap_id,
436 "object" => origin.ap_id,
437 "target" => target.ap_id,
438 "to" => [origin.follower_address]
441 with true <- origin.ap_id in target.also_known_as,
442 {:ok, activity} <- insert(params, local),
443 _ <- notify_and_stream(activity) do
444 maybe_federate(activity)
446 BackgroundWorker.enqueue("move_following", %{
447 "origin_id" => origin.id,
448 "target_id" => target.id
453 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
458 def fetch_activities_for_context_query(context, opts) do
459 public = [Constants.as_public()]
463 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
466 from(activity in Activity)
467 |> maybe_preload_objects(opts)
468 |> maybe_preload_bookmarks(opts)
469 |> maybe_set_thread_muted_field(opts)
470 |> restrict_blocked(opts)
471 |> restrict_blockers_visibility(opts)
472 |> restrict_recipients(recipients, opts[:user])
473 |> restrict_filtered(opts)
477 "?->>'type' = ? and ?->>'context' = ?",
484 |> exclude_poll_votes(opts)
486 |> order_by([activity], desc: activity.id)
489 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
490 def fetch_activities_for_context(context, opts \\ %{}) do
492 |> fetch_activities_for_context_query(opts)
496 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
497 FlakeId.Ecto.CompatType.t() | nil
498 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
500 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
501 |> restrict_visibility(%{visibility: "direct"})
507 defp fetch_paginated_optimized(query, opts, pagination) do
508 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
509 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
510 opts = Map.put(opts, :skip_extra_order, true)
512 Pagination.fetch_paginated(query, opts, pagination)
515 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
516 list_memberships = Pleroma.List.memberships(opts[:user])
518 fetch_activities_query(recipients ++ list_memberships, opts)
519 |> fetch_paginated_optimized(opts, pagination)
521 |> maybe_update_cc(list_memberships, opts[:user])
524 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
525 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
526 includes_local_public = Map.get(opts, :includes_local_public, false)
528 opts = Map.delete(opts, :user)
530 intended_recipients =
531 if includes_local_public do
532 [Constants.as_public(), as_local_public()]
534 [Constants.as_public()]
538 |> fetch_activities_query(opts)
539 |> restrict_unlisted(opts)
540 |> fetch_paginated_optimized(opts, pagination)
543 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
544 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
546 |> Map.put(:restrict_unlisted, true)
547 |> fetch_public_or_unlisted_activities(pagination)
550 @valid_visibilities ~w[direct unlisted public private]
552 defp restrict_visibility(query, %{visibility: visibility})
553 when is_list(visibility) do
554 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
559 "activity_visibility(?, ?, ?) = ANY (?)",
567 Logger.error("Could not restrict visibility to #{visibility}")
571 defp restrict_visibility(query, %{visibility: visibility})
572 when visibility in @valid_visibilities do
576 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
580 defp restrict_visibility(_query, %{visibility: visibility})
581 when visibility not in @valid_visibilities do
582 Logger.error("Could not restrict visibility to #{visibility}")
585 defp restrict_visibility(query, _visibility), do: query
587 defp exclude_visibility(query, %{exclude_visibilities: visibility})
588 when is_list(visibility) do
589 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
594 "activity_visibility(?, ?, ?) = ANY (?)",
602 Logger.error("Could not exclude visibility to #{visibility}")
607 defp exclude_visibility(query, %{exclude_visibilities: visibility})
608 when visibility in @valid_visibilities do
613 "activity_visibility(?, ?, ?) = ?",
622 defp exclude_visibility(query, %{exclude_visibilities: visibility})
623 when visibility not in [nil | @valid_visibilities] do
624 Logger.error("Could not exclude visibility to #{visibility}")
628 defp exclude_visibility(query, _visibility), do: query
630 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
633 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
636 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
637 local_public = as_local_public()
641 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
645 defp restrict_thread_visibility(query, _, _), do: query
647 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
650 |> Map.put(:user, reading_user)
651 |> Map.put(:actor_id, user.ap_id)
654 godmode: params[:godmode],
655 reading_user: reading_user
657 |> user_activities_recipients()
658 |> fetch_activities(params)
662 def fetch_user_activities(user, reading_user, params \\ %{})
664 def fetch_user_activities(user, reading_user, %{total: true} = params) do
665 result = fetch_activities_for_user(user, reading_user, params)
667 Keyword.put(result, :items, Enum.reverse(result[:items]))
670 def fetch_user_activities(user, reading_user, params) do
672 |> fetch_activities_for_user(reading_user, params)
676 defp fetch_activities_for_user(user, reading_user, params) do
679 |> Map.put(:type, ["Create", "Announce"])
680 |> Map.put(:user, reading_user)
681 |> Map.put(:actor_id, user.ap_id)
682 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
685 if User.blocks?(reading_user, user) do
689 |> Map.put(:blocking_user, reading_user)
690 |> Map.put(:muting_user, reading_user)
693 pagination_type = Map.get(params, :pagination_type) || :keyset
696 godmode: params[:godmode],
697 reading_user: reading_user
699 |> user_activities_recipients()
700 |> fetch_activities(params, pagination_type)
703 def fetch_statuses(reading_user, %{total: true} = params) do
704 result = fetch_activities_for_reading_user(reading_user, params)
705 Keyword.put(result, :items, Enum.reverse(result[:items]))
708 def fetch_statuses(reading_user, params) do
710 |> fetch_activities_for_reading_user(params)
714 defp fetch_activities_for_reading_user(reading_user, params) do
715 params = Map.put(params, :type, ["Create", "Announce"])
718 godmode: params[:godmode],
719 reading_user: reading_user
721 |> user_activities_recipients()
722 |> fetch_activities(params, :offset)
725 defp user_activities_recipients(%{godmode: true}), do: []
727 defp user_activities_recipients(%{reading_user: reading_user}) do
728 if not is_nil(reading_user) and reading_user.local do
730 Constants.as_public(),
732 reading_user.ap_id | User.following(reading_user)
735 [Constants.as_public()]
739 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
740 raise "Can't use the child object without preloading!"
743 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
745 [activity, object] in query,
748 "?->>'type' != ? or ?->>'actor' != ?",
757 defp restrict_announce_object_actor(query, _), do: query
759 defp restrict_since(query, %{since_id: ""}), do: query
761 defp restrict_since(query, %{since_id: since_id}) do
762 from(activity in query, where: activity.id > ^since_id)
765 defp restrict_since(query, _), do: query
767 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
768 raise_on_missing_preload()
771 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
773 [_activity, object] in query,
774 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
778 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
779 restrict_embedded_tag_any(query, %{tag: tag})
782 defp restrict_embedded_tag_all(query, _), do: query
784 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
785 raise_on_missing_preload()
788 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
790 [_activity, object] in query,
791 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
795 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
796 restrict_embedded_tag_any(query, %{tag: [tag]})
799 defp restrict_embedded_tag_any(query, _), do: query
801 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
802 raise_on_missing_preload()
805 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
807 [_activity, object] in query,
808 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
812 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
813 when is_binary(tag_reject) do
814 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
817 defp restrict_embedded_tag_reject_any(query, _), do: query
819 defp object_ids_query_for_tags(tags) do
820 from(hto in "hashtags_objects")
821 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
822 |> where([hto, ht], ht.name in ^tags)
823 |> select([hto], hto.object_id)
824 |> distinct([hto], true)
827 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
828 raise_on_missing_preload()
831 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
832 restrict_hashtag_any(query, %{tag: single_tag})
835 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
837 [_activity, object] in query,
841 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
842 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
843 AND hashtags_objects.object_id = ?) @> ?
852 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
853 restrict_hashtag_all(query, %{tag_all: [tag]})
856 defp restrict_hashtag_all(query, _), do: query
858 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
859 raise_on_missing_preload()
862 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
864 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
867 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
869 [_activity, object] in query,
870 join: hto in "hashtags_objects",
871 on: hto.object_id == object.id,
872 where: hto.hashtag_id in ^hashtag_ids,
873 distinct: [desc: object.id],
874 order_by: [desc: object.id]
878 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
879 restrict_hashtag_any(query, %{tag: [tag]})
882 defp restrict_hashtag_any(query, _), do: query
884 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
885 raise_on_missing_preload()
888 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
890 [_activity, object] in query,
891 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
895 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
896 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
899 defp restrict_hashtag_reject_any(query, _), do: query
901 defp raise_on_missing_preload do
902 raise "Can't use the child object without preloading!"
905 defp restrict_recipients(query, [], _user), do: query
907 defp restrict_recipients(query, recipients, nil) do
908 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
911 defp restrict_recipients(query, recipients, user) do
914 where: fragment("? && ?", ^recipients, activity.recipients),
915 or_where: activity.actor == ^user.ap_id
919 defp restrict_local(query, %{local_only: true}) do
920 from(activity in query, where: activity.local == true)
923 defp restrict_local(query, _), do: query
925 defp restrict_remote(query, %{remote: true}) do
926 from(activity in query, where: activity.local == false)
929 defp restrict_remote(query, _), do: query
931 defp restrict_actor(query, %{actor_id: actor_id}) do
932 from(activity in query, where: activity.actor == ^actor_id)
935 defp restrict_actor(query, _), do: query
937 defp restrict_type(query, %{type: type}) when is_binary(type) do
938 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
941 defp restrict_type(query, %{type: type}) do
942 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
945 defp restrict_type(query, _), do: query
947 defp restrict_state(query, %{state: state}) do
948 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
951 defp restrict_state(query, _), do: query
953 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
955 [_activity, object] in query,
956 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
960 defp restrict_favorited_by(query, _), do: query
962 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
963 raise "Can't use the child object without preloading!"
966 defp restrict_media(query, %{only_media: true}) do
968 [activity, object] in query,
969 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
970 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
974 defp restrict_media(query, _), do: query
976 defp restrict_replies(query, %{exclude_replies: true}) do
978 [_activity, object] in query,
979 where: fragment("?->>'inReplyTo' is null", object.data)
983 defp restrict_replies(query, %{
984 reply_filtering_user: %User{} = user,
985 reply_visibility: "self"
988 [activity, object] in query,
991 "?->>'inReplyTo' is null OR ? = ANY(?)",
999 defp restrict_replies(query, %{
1000 reply_filtering_user: %User{} = user,
1001 reply_visibility: "following"
1004 [activity, object] in query,
1008 ?->>'type' != 'Create' -- This isn't a Create
1009 OR ?->>'inReplyTo' is null -- this isn't a reply
1010 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1011 -- unless they are the author (because authors
1012 -- are also part of the recipients). This leads
1013 -- to a bug that self-replies by friends won't
1015 OR ? = ? -- The actor is us
1019 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1020 activity.recipients,
1028 defp restrict_replies(query, _), do: query
1030 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1031 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1034 defp restrict_reblogs(query, _), do: query
1036 defp restrict_muted(query, %{with_muted: true}), do: query
1038 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1039 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1042 from([activity] in query,
1043 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1046 "not (?->'to' \\?| ?) or ? = ?",
1054 unless opts[:skip_preload] do
1055 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1061 defp restrict_muted(query, _), do: query
1063 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1064 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1065 domain_blocks = user.domain_blocks || []
1067 following_ap_ids = User.get_friends_ap_ids(user)
1070 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1073 [activity, object: o] in query,
1074 # You don't block the author
1075 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1077 # You don't block any recipients, and didn't author the post
1080 "((not (? && ?)) or ? = ?)",
1081 activity.recipients,
1087 # You don't block the domain of any recipients, and didn't author the post
1090 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1091 activity.recipients,
1097 # It's not a boost of a user you block
1100 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1106 # You don't block the author's domain, and also don't follow the author
1109 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1116 # Same as above, but checks the Object
1119 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1128 defp restrict_blocked(query, _), do: query
1130 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1131 if Config.get([:activitypub, :blockers_visible]) == true do
1134 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1138 # The author doesn't block you
1139 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1141 # It's not a boost of a user that blocks you
1144 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1153 defp restrict_blockers_visibility(query, _), do: query
1155 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1160 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1162 ^[Constants.as_public()]
1167 defp restrict_unlisted(query, _), do: query
1169 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1171 [activity, object: o] in query,
1174 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1183 defp restrict_pinned(query, _), do: query
1185 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1186 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1192 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1200 defp restrict_muted_reblogs(query, _), do: query
1202 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1205 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1209 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1212 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1216 defp restrict_instance(query, _), do: query
1218 defp restrict_filtered(query, %{user: %User{} = user}) do
1219 case Filter.compose_regex(user) do
1224 from([activity, object] in query,
1226 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1227 activity.actor == ^user.ap_id
1232 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1233 restrict_filtered(query, %{user: user})
1236 defp restrict_filtered(query, _), do: query
1238 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1240 defp exclude_poll_votes(query, _) do
1241 if has_named_binding?(query, :object) do
1242 from([activity, object: o] in query,
1243 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1250 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1251 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1253 defp exclude_invisible_actors(query, _opts) do
1255 User.Query.build(%{invisible: true, select: [:ap_id]})
1257 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1259 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1262 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1263 from(activity in query, where: activity.id != ^id)
1266 defp exclude_id(query, _), do: query
1268 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1270 defp maybe_preload_objects(query, _) do
1272 |> Activity.with_preloaded_object()
1275 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1277 defp maybe_preload_bookmarks(query, opts) do
1279 |> Activity.with_preloaded_bookmark(opts[:user])
1282 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1284 |> Activity.with_preloaded_report_notes()
1287 defp maybe_preload_report_notes(query, _), do: query
1289 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1291 defp maybe_set_thread_muted_field(query, opts) do
1293 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1296 defp maybe_order(query, %{order: :desc}) do
1298 |> order_by(desc: :id)
1301 defp maybe_order(query, %{order: :asc}) do
1303 |> order_by(asc: :id)
1306 defp maybe_order(query, _), do: query
1308 defp normalize_fetch_activities_query_opts(opts) do
1309 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1311 value when is_bitstring(value) ->
1312 Map.put(opts, key, Hashtag.normalize_name(value))
1314 value when is_list(value) ->
1317 |> Enum.map(&Hashtag.normalize_name/1)
1320 Map.put(opts, key, normalized_value)
1328 defp fetch_activities_query_ap_ids_ops(opts) do
1329 source_user = opts[:muting_user]
1330 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1332 ap_id_relationships =
1333 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1334 [:block | ap_id_relationships]
1339 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1341 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1342 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1344 restrict_muted_reblogs_opts =
1345 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1347 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1350 def fetch_activities_query(recipients, opts \\ %{}) do
1351 opts = normalize_fetch_activities_query_opts(opts)
1353 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1354 fetch_activities_query_ap_ids_ops(opts)
1357 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1362 |> maybe_preload_objects(opts)
1363 |> maybe_preload_bookmarks(opts)
1364 |> maybe_preload_report_notes(opts)
1365 |> maybe_set_thread_muted_field(opts)
1366 |> maybe_order(opts)
1367 |> restrict_recipients(recipients, opts[:user])
1368 |> restrict_replies(opts)
1369 |> restrict_since(opts)
1370 |> restrict_local(opts)
1371 |> restrict_remote(opts)
1372 |> restrict_actor(opts)
1373 |> restrict_type(opts)
1374 |> restrict_state(opts)
1375 |> restrict_favorited_by(opts)
1376 |> restrict_blocked(restrict_blocked_opts)
1377 |> restrict_blockers_visibility(opts)
1378 |> restrict_muted(restrict_muted_opts)
1379 |> restrict_filtered(opts)
1380 |> restrict_media(opts)
1381 |> restrict_visibility(opts)
1382 |> restrict_thread_visibility(opts, config)
1383 |> restrict_reblogs(opts)
1384 |> restrict_pinned(opts)
1385 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1386 |> restrict_instance(opts)
1387 |> restrict_announce_object_actor(opts)
1388 |> restrict_filtered(opts)
1389 |> maybe_restrict_deactivated_users(opts)
1390 |> exclude_poll_votes(opts)
1391 |> exclude_invisible_actors(opts)
1392 |> exclude_visibility(opts)
1394 if Config.feature_enabled?(:improved_hashtag_timeline) do
1396 |> restrict_hashtag_any(opts)
1397 |> restrict_hashtag_all(opts)
1398 |> restrict_hashtag_reject_any(opts)
1401 |> restrict_embedded_tag_any(opts)
1402 |> restrict_embedded_tag_all(opts)
1403 |> restrict_embedded_tag_reject_any(opts)
1408 Fetch favorites activities of user with order by sort adds to favorites
1410 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1411 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1413 |> Activity.Queries.by_actor()
1414 |> Activity.Queries.by_type("Like")
1415 |> Activity.with_joined_object()
1416 |> Object.with_joined_activity()
1417 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1418 |> order_by([like, _, _], desc_nulls_last: like.id)
1419 |> Pagination.fetch_paginated(
1420 Map.merge(params, %{skip_order: true}),
1425 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1426 Enum.map(activities, fn
1427 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1428 if Enum.any?(bcc, &(&1 in list_memberships)) do
1429 update_in(activity.data["cc"], &[user_ap_id | &1])
1439 defp maybe_update_cc(activities, _, _), do: activities
1441 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1442 from(activity in query,
1444 fragment("? && ?", activity.recipients, ^recipients) or
1445 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1446 ^Constants.as_public() in activity.recipients)
1450 def fetch_activities_bounded(
1452 recipients_with_public,
1454 pagination \\ :keyset
1456 fetch_activities_query([], opts)
1457 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1458 |> Pagination.fetch_paginated(opts, pagination)
1462 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1463 def upload(file, opts \\ []) do
1464 with {:ok, data} <- Upload.store(file, opts) do
1465 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1467 Repo.insert(%Object{data: obj_data})
1471 @spec get_actor_url(any()) :: binary() | nil
1472 defp get_actor_url(url) when is_binary(url), do: url
1473 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1475 defp get_actor_url(url) when is_list(url) do
1481 defp get_actor_url(_url), do: nil
1483 defp normalize_image(%{"url" => url}) do
1486 "url" => [%{"href" => url}]
1490 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1491 defp normalize_image(_), do: nil
1493 defp object_to_user_data(data, additional) do
1496 |> Map.get("attachment", [])
1497 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1498 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1502 |> Map.get("tag", [])
1504 %{"type" => "Emoji"} -> true
1507 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1508 {String.trim(name, ":"), url}
1511 is_locked = data["manuallyApprovesFollowers"] || false
1512 data = Transmogrifier.maybe_fix_user_object(data)
1513 is_discoverable = data["discoverable"] || false
1514 invisible = data["invisible"] || false
1515 actor_type = data["type"] || "Person"
1517 featured_address = data["featured"]
1518 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1521 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1522 data["publicKey"]["publicKeyPem"]
1526 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1527 data["endpoints"]["sharedInbox"]
1530 # if WebFinger request was already done, we probably have acct, otherwise
1531 # we request WebFinger here
1532 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1534 # also_known_as must be a URL
1537 |> Map.get("alsoKnownAs", [])
1538 |> Enum.filter(fn url ->
1539 case URI.parse(url) do
1540 %URI{scheme: "http"} -> true
1541 %URI{scheme: "https"} -> true
1548 uri: get_actor_url(data["url"]),
1550 banner: normalize_image(data["image"]),
1553 is_locked: is_locked,
1554 is_discoverable: is_discoverable,
1555 invisible: invisible,
1556 avatar: normalize_image(data["icon"]),
1558 follower_address: data["followers"],
1559 following_address: data["following"],
1560 featured_address: featured_address,
1561 bio: data["summary"] || "",
1562 actor_type: actor_type,
1563 also_known_as: also_known_as,
1564 public_key: public_key,
1565 inbox: data["inbox"],
1566 shared_inbox: shared_inbox,
1567 pinned_objects: pinned_objects,
1572 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1573 generated = "#{username}@#{URI.parse(data["id"]).host}"
1575 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1576 case WebFinger.finger(generated) do
1577 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1585 # nickname can be nil because of virtual actors
1586 defp generate_nickname(_), do: nil
1588 def fetch_follow_information_for_user(user) do
1589 with {:ok, following_data} <-
1590 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1591 {:ok, hide_follows} <- collection_private(following_data),
1592 {:ok, followers_data} <-
1593 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1594 {:ok, hide_followers} <- collection_private(followers_data) do
1597 hide_follows: hide_follows,
1598 follower_count: normalize_counter(followers_data["totalItems"]),
1599 following_count: normalize_counter(following_data["totalItems"]),
1600 hide_followers: hide_followers
1603 {:error, _} = e -> e
1608 defp normalize_counter(counter) when is_integer(counter), do: counter
1609 defp normalize_counter(_), do: 0
1611 def maybe_update_follow_information(user_data) do
1612 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1613 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1615 {:collections_available,
1616 !!(user_data[:following_address] && user_data[:follower_address])},
1618 fetch_follow_information_for_user(user_data) do
1619 info = Map.merge(user_data[:info] || %{}, info)
1622 |> Map.put(:info, info)
1624 {:user_type_check, false} ->
1627 {:collections_available, false} ->
1630 {:enabled, false} ->
1635 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1642 defp collection_private(%{"first" => %{"type" => type}})
1643 when type in ["CollectionPage", "OrderedCollectionPage"],
1646 defp collection_private(%{"first" => first}) do
1647 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1648 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1651 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1652 {:error, _} = e -> e
1657 defp collection_private(_data), do: {:ok, true}
1659 def user_data_from_user_object(data, additional \\ []) do
1660 with {:ok, data} <- MRF.filter(data) do
1661 {:ok, object_to_user_data(data, additional)}
1667 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1668 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1669 {:ok, data} <- user_data_from_user_object(data, additional) do
1670 {:ok, maybe_update_follow_information(data)}
1672 # If this has been deleted, only log a debug and not an error
1673 {:error, "Object has been deleted" = e} ->
1674 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1677 {:error, {:reject, reason} = e} ->
1678 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1682 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1687 def maybe_handle_clashing_nickname(data) do
1688 with nickname when is_binary(nickname) <- data[:nickname],
1689 %User{} = old_user <- User.get_by_nickname(nickname),
1690 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1692 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1696 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1697 |> User.update_and_set_cache()
1699 {:ap_id_comparison, true} ->
1701 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1709 def pin_data_from_featured_collection(%{
1710 "type" => "OrderedCollection",
1713 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1715 |> Map.get("orderedItems")
1716 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1719 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1724 def pin_data_from_featured_collection(
1729 when type in ["OrderedCollection", "Collection"] do
1730 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1732 # Items can either be a map _or_ a string
1735 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1736 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1740 def fetch_and_prepare_featured_from_ap_id(nil) do
1744 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1745 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1746 {:ok, pin_data_from_featured_collection(data)}
1749 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1754 def pinned_fetch_task(nil), do: nil
1756 def pinned_fetch_task(%{pinned_objects: pins}) do
1757 if Enum.all?(pins, fn {ap_id, _} ->
1758 Object.get_cached_by_ap_id(ap_id) ||
1759 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1767 def make_user_from_ap_id(ap_id, additional \\ []) do
1768 user = User.get_cached_by_ap_id(ap_id)
1770 if user && !User.ap_enabled?(user) do
1771 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1773 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1774 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1778 |> User.remote_user_changeset(data)
1779 |> User.update_and_set_cache()
1781 maybe_handle_clashing_nickname(data)
1784 |> User.remote_user_changeset()
1792 def make_user_from_nickname(nickname) do
1793 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1794 WebFinger.finger(nickname) do
1795 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1797 _e -> {:error, "No AP id in WebFinger"}
1801 # filter out broken threads
1802 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1803 entire_thread_visible_for_user?(activity, user)
1806 # do post-processing on a specific activity
1807 def contain_activity(%Activity{} = activity, %User{} = user) do
1808 contain_broken_threads(activity, user)
1811 def fetch_direct_messages_query do
1813 |> restrict_type(%{type: "Create"})
1814 |> restrict_visibility(%{visibility: "direct"})
1815 |> order_by([activity], asc: activity.id)
1818 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1820 defp maybe_restrict_deactivated_users(activity, _opts),
1821 do: Activity.restrict_deactivated_users(activity)