1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
56 defp check_actor_is_active(nil), do: true
58 defp check_actor_is_active(actor) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
65 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
66 limit = Config.get([:instance, :remote_limit])
67 String.length(content) <= limit
70 defp check_remote_limit(_), do: true
72 def increase_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
76 def decrease_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
80 defp increase_replies_count_if_reply(%{
81 "object" => %{"inReplyTo" => reply_ap_id} = object,
84 if is_public?(object) do
85 Object.increase_replies_count(reply_ap_id)
89 defp increase_replies_count_if_reply(_create_data), do: :noop
91 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note]
93 def persist(%{"type" => type} = object, meta) when type in @object_types do
94 with {:ok, object} <- Object.create(object) do
100 def persist(object, meta) do
101 with local <- Keyword.fetch!(meta, :local),
102 {recipients, _, _} <- get_recipients(object),
104 Repo.insert(%Activity{
107 recipients: recipients,
108 actor: object["actor"]
110 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
111 {:ok, _} <- maybe_create_activity_expiration(activity) do
112 {:ok, activity, meta}
116 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
117 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
118 with nil <- Activity.normalize(map),
119 map <- lazy_put_activity_defaults(map, fake),
120 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
121 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
122 {:ok, map} <- MRF.filter(map),
123 {recipients, _, _} = get_recipients(map),
124 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
125 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
126 {:ok, map, object} <- insert_full_object(map),
127 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
128 # Splice in the child object if we have one.
129 activity = Maps.put_if_present(activity, :object, object)
131 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
132 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
137 %Activity{} = activity ->
143 {:containment, _} = error ->
146 {:error, _} = error ->
149 {:fake, true, map, recipients} ->
150 activity = %Activity{
154 recipients: recipients,
158 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:remote_limit_pass, _} ->
162 {:error, :remote_limit}
169 defp insert_activity_with_expiration(data, local, recipients) do
173 actor: data["actor"],
174 recipients: recipients
177 with {:ok, activity} <- Repo.insert(struct) do
178 maybe_create_activity_expiration(activity)
182 def notify_and_stream(activity) do
183 Notification.create_notifications(activity)
185 conversation = create_or_bump_conversation(activity, activity.actor)
186 participations = get_participations(conversation)
188 stream_out_participations(participations)
191 defp maybe_create_activity_expiration(
192 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
195 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
196 activity_id: activity.id,
197 expires_at: expires_at
203 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
205 defp create_or_bump_conversation(activity, actor) do
206 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
207 %User{} = user <- User.get_cached_by_ap_id(actor) do
208 Participation.mark_as_read(user, conversation)
213 defp get_participations({:ok, conversation}) do
215 |> Repo.preload(:participations, force: true)
216 |> Map.get(:participations)
219 defp get_participations(_), do: []
221 def stream_out_participations(participations) do
224 |> Repo.preload(:user)
226 Streamer.stream("participation", participations)
230 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
231 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
232 conversation = Repo.preload(conversation, :participations)
235 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
240 if last_activity_id do
241 stream_out_participations(conversation.participations)
247 def stream_out_participations(_, _), do: :noop
250 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
251 when data_type in ["Create", "Announce", "Delete"] do
253 |> Topics.get_activity_topics()
254 |> Streamer.stream(activity)
258 def stream_out(_activity) do
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
278 %{to: to, actor: actor, published: published, context: context, object: object},
282 with {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 _ <- notify_and_stream(activity),
288 :ok <- maybe_federate(activity) do
291 {:quick_insert, true, activity} ->
294 {:fake, true, activity} ->
298 Repo.rollback(message)
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
311 %{to: to, actor: actor, published: published, context: context, object: object},
315 with {:ok, activity} <- insert(listen_data, local),
316 _ <- notify_and_stream(activity),
317 :ok <- maybe_federate(activity) do
322 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
323 {:ok, Activity.t()} | nil | {:error, any()}
324 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
325 with {:ok, result} <-
326 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
331 defp do_unfollow(follower, followed, activity_id, local) do
332 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
333 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
334 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
335 {:ok, activity} <- insert(unfollow_data, local),
336 _ <- notify_and_stream(activity),
337 :ok <- maybe_federate(activity) do
341 {:error, error} -> Repo.rollback(error)
345 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
347 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
361 # only accept false as false value
362 local = !(params[:local] == false)
363 forward = !(params[:forward] == false)
365 additional = params[:additional] || %{}
369 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
371 Map.merge(additional, %{"to" => [], "cc" => []})
374 with flag_data <- make_flag_data(params, additional),
375 {:ok, activity} <- insert(flag_data, local),
376 {:ok, stripped_activity} <- strip_report_status_data(activity),
377 _ <- notify_and_stream(activity),
379 maybe_federate(stripped_activity) do
380 User.all_superusers()
381 |> Enum.filter(fn user -> user.ap_id != actor end)
382 |> Enum.filter(fn user -> not is_nil(user.email) end)
383 |> Enum.each(fn superuser ->
385 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
386 |> Pleroma.Emails.Mailer.deliver_async()
391 {:error, error} -> Repo.rollback(error)
395 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
396 def move(%User{} = origin, %User{} = target, local \\ true) do
399 "actor" => origin.ap_id,
400 "object" => origin.ap_id,
401 "target" => target.ap_id
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_recipients(recipients, opts[:user])
435 |> restrict_filtered(opts)
439 "?->>'type' = ? and ?->>'context' = ?",
446 |> exclude_poll_votes(opts)
448 |> order_by([activity], desc: activity.id)
451 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
452 def fetch_activities_for_context(context, opts \\ %{}) do
454 |> fetch_activities_for_context_query(opts)
458 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
459 FlakeId.Ecto.CompatType.t() | nil
460 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
462 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
463 |> restrict_visibility(%{visibility: "direct"})
469 defp fetch_paginated_optimized(query, opts, pagination) do
470 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
471 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
472 opts = Map.put(opts, :skip_extra_order, true)
474 Pagination.fetch_paginated(query, opts, pagination)
477 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
478 list_memberships = Pleroma.List.memberships(opts[:user])
480 fetch_activities_query(recipients ++ list_memberships, opts)
481 |> fetch_paginated_optimized(opts, pagination)
483 |> maybe_update_cc(list_memberships, opts[:user])
486 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
487 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
488 opts = Map.delete(opts, :user)
490 [Constants.as_public()]
491 |> fetch_activities_query(opts)
492 |> restrict_unlisted(opts)
493 |> fetch_paginated_optimized(opts, pagination)
496 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
497 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
499 |> Map.put(:restrict_unlisted, true)
500 |> fetch_public_or_unlisted_activities(pagination)
503 @valid_visibilities ~w[direct unlisted public private]
505 defp restrict_visibility(query, %{visibility: visibility})
506 when is_list(visibility) do
507 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
512 "activity_visibility(?, ?, ?) = ANY (?)",
520 Logger.error("Could not restrict visibility to #{visibility}")
524 defp restrict_visibility(query, %{visibility: visibility})
525 when visibility in @valid_visibilities do
529 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
533 defp restrict_visibility(_query, %{visibility: visibility})
534 when visibility not in @valid_visibilities do
535 Logger.error("Could not restrict visibility to #{visibility}")
538 defp restrict_visibility(query, _visibility), do: query
540 defp exclude_visibility(query, %{exclude_visibilities: visibility})
541 when is_list(visibility) do
542 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
547 "activity_visibility(?, ?, ?) = ANY (?)",
555 Logger.error("Could not exclude visibility to #{visibility}")
560 defp exclude_visibility(query, %{exclude_visibilities: visibility})
561 when visibility in @valid_visibilities do
566 "activity_visibility(?, ?, ?) = ?",
575 defp exclude_visibility(query, %{exclude_visibilities: visibility})
576 when visibility not in [nil | @valid_visibilities] do
577 Logger.error("Could not exclude visibility to #{visibility}")
581 defp exclude_visibility(query, _visibility), do: query
583 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
586 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
589 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
592 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
596 defp restrict_thread_visibility(query, _, _), do: query
598 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
601 |> Map.put(:user, reading_user)
602 |> Map.put(:actor_id, user.ap_id)
605 godmode: params[:godmode],
606 reading_user: reading_user
608 |> user_activities_recipients()
609 |> fetch_activities(params)
613 def fetch_user_activities(user, reading_user, params \\ %{})
615 def fetch_user_activities(user, reading_user, %{total: true} = params) do
616 result = fetch_activities_for_user(user, reading_user, params)
618 Keyword.put(result, :items, Enum.reverse(result[:items]))
621 def fetch_user_activities(user, reading_user, params) do
623 |> fetch_activities_for_user(reading_user, params)
627 defp fetch_activities_for_user(user, reading_user, params) do
630 |> Map.put(:type, ["Create", "Announce"])
631 |> Map.put(:user, reading_user)
632 |> Map.put(:actor_id, user.ap_id)
633 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
636 if User.blocks?(reading_user, user) do
640 |> Map.put(:blocking_user, reading_user)
641 |> Map.put(:muting_user, reading_user)
644 pagination_type = Map.get(params, :pagination_type) || :keyset
647 godmode: params[:godmode],
648 reading_user: reading_user
650 |> user_activities_recipients()
651 |> fetch_activities(params, pagination_type)
654 def fetch_statuses(reading_user, %{total: true} = params) do
655 result = fetch_activities_for_reading_user(reading_user, params)
656 Keyword.put(result, :items, Enum.reverse(result[:items]))
659 def fetch_statuses(reading_user, params) do
661 |> fetch_activities_for_reading_user(params)
665 defp fetch_activities_for_reading_user(reading_user, params) do
666 params = Map.put(params, :type, ["Create", "Announce"])
669 godmode: params[:godmode],
670 reading_user: reading_user
672 |> user_activities_recipients()
673 |> fetch_activities(params, :offset)
676 defp user_activities_recipients(%{godmode: true}), do: []
678 defp user_activities_recipients(%{reading_user: reading_user}) do
680 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
682 [Constants.as_public()]
686 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
690 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
692 [activity, object] in query,
695 "?->>'type' != ? or ?->>'actor' != ?",
704 defp restrict_announce_object_actor(query, _), do: query
706 defp restrict_since(query, %{since_id: ""}), do: query
708 defp restrict_since(query, %{since_id: since_id}) do
709 from(activity in query, where: activity.id > ^since_id)
712 defp restrict_since(query, _), do: query
714 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
715 raise_on_missing_preload()
718 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
720 [_activity, object] in query,
721 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
725 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
726 restrict_embedded_tag_any(query, %{tag: tag})
729 defp restrict_embedded_tag_all(query, _), do: query
731 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
732 raise_on_missing_preload()
735 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
737 [_activity, object] in query,
738 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
742 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
743 restrict_embedded_tag_any(query, %{tag: [tag]})
746 defp restrict_embedded_tag_any(query, _), do: query
748 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
749 raise_on_missing_preload()
752 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
754 [_activity, object] in query,
755 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
759 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
760 when is_binary(tag_reject) do
761 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
764 defp restrict_embedded_tag_reject_any(query, _), do: query
766 defp object_ids_query_for_tags(tags) do
767 from(hto in "hashtags_objects")
768 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
769 |> where([hto, ht], ht.name in ^tags)
770 |> select([hto], hto.object_id)
771 |> distinct([hto], true)
774 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
775 raise_on_missing_preload()
778 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
779 restrict_hashtag_any(query, %{tag: single_tag})
782 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
784 [_activity, object] in query,
788 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
789 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
790 AND hashtags_objects.object_id = ?) @> ?
799 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
800 restrict_hashtag_all(query, %{tag_all: [tag]})
803 defp restrict_hashtag_all(query, _), do: query
805 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
806 raise_on_missing_preload()
809 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
811 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
814 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
816 [_activity, object] in query,
817 join: hto in "hashtags_objects",
818 on: hto.object_id == object.id,
819 where: hto.hashtag_id in ^hashtag_ids,
820 distinct: [desc: object.id],
821 order_by: [desc: object.id]
825 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
826 restrict_hashtag_any(query, %{tag: [tag]})
829 defp restrict_hashtag_any(query, _), do: query
831 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
832 raise_on_missing_preload()
835 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
837 [_activity, object] in query,
838 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
842 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
843 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
846 defp restrict_hashtag_reject_any(query, _), do: query
848 defp raise_on_missing_preload do
849 raise "Can't use the child object without preloading!"
852 defp restrict_recipients(query, [], _user), do: query
854 defp restrict_recipients(query, recipients, nil) do
855 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
858 defp restrict_recipients(query, recipients, user) do
861 where: fragment("? && ?", ^recipients, activity.recipients),
862 or_where: activity.actor == ^user.ap_id
866 defp restrict_local(query, %{local_only: true}) do
867 from(activity in query, where: activity.local == true)
870 defp restrict_local(query, _), do: query
872 defp restrict_remote(query, %{remote: true}) do
873 from(activity in query, where: activity.local == false)
876 defp restrict_remote(query, _), do: query
878 defp restrict_actor(query, %{actor_id: actor_id}) do
879 from(activity in query, where: activity.actor == ^actor_id)
882 defp restrict_actor(query, _), do: query
884 defp restrict_type(query, %{type: type}) when is_binary(type) do
885 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
888 defp restrict_type(query, %{type: type}) do
889 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
892 defp restrict_type(query, _), do: query
894 defp restrict_state(query, %{state: state}) do
895 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
898 defp restrict_state(query, _), do: query
900 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
902 [_activity, object] in query,
903 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
907 defp restrict_favorited_by(query, _), do: query
909 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
910 raise "Can't use the child object without preloading!"
913 defp restrict_media(query, %{only_media: true}) do
915 [activity, object] in query,
916 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
917 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
921 defp restrict_media(query, _), do: query
923 defp restrict_replies(query, %{exclude_replies: true}) do
925 [_activity, object] in query,
926 where: fragment("?->>'inReplyTo' is null", object.data)
930 defp restrict_replies(query, %{
931 reply_filtering_user: %User{} = user,
932 reply_visibility: "self"
935 [activity, object] in query,
938 "?->>'inReplyTo' is null OR ? = ANY(?)",
946 defp restrict_replies(query, %{
947 reply_filtering_user: %User{} = user,
948 reply_visibility: "following"
951 [activity, object] in query,
955 ?->>'type' != 'Create' -- This isn't a Create
956 OR ?->>'inReplyTo' is null -- this isn't a reply
957 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
958 -- unless they are the author (because authors
959 -- are also part of the recipients). This leads
960 -- to a bug that self-replies by friends won't
962 OR ? = ? -- The actor is us
966 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
975 defp restrict_replies(query, _), do: query
977 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
978 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
981 defp restrict_reblogs(query, _), do: query
983 defp restrict_muted(query, %{with_muted: true}), do: query
985 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
986 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
989 from([activity] in query,
990 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
993 "not (?->'to' \\?| ?) or ? = ?",
1001 unless opts[:skip_preload] do
1002 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1008 defp restrict_muted(query, _), do: query
1010 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1011 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1012 domain_blocks = user.domain_blocks || []
1014 following_ap_ids = User.get_friends_ap_ids(user)
1017 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1020 [activity, object: o] in query,
1021 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1024 "((not (? && ?)) or ? = ?)",
1025 activity.recipients,
1032 "recipients_contain_blocked_domains(?, ?) = false",
1033 activity.recipients,
1038 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1045 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1053 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1062 defp restrict_blocked(query, _), do: query
1064 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1069 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1071 ^[Constants.as_public()]
1076 defp restrict_unlisted(query, _), do: query
1078 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1080 [activity, object: o] in query,
1083 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1092 defp restrict_pinned(query, _), do: query
1094 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1095 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1101 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1109 defp restrict_muted_reblogs(query, _), do: query
1111 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1114 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1118 defp restrict_instance(query, _), do: query
1120 defp restrict_filtered(query, %{user: %User{} = user}) do
1121 case Filter.compose_regex(user) do
1126 from([activity, object] in query,
1128 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1129 activity.actor == ^user.ap_id
1134 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1135 restrict_filtered(query, %{user: user})
1138 defp restrict_filtered(query, _), do: query
1140 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1142 defp exclude_poll_votes(query, _) do
1143 if has_named_binding?(query, :object) do
1144 from([activity, object: o] in query,
1145 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1152 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1154 defp exclude_chat_messages(query, _) do
1155 if has_named_binding?(query, :object) do
1156 from([activity, object: o] in query,
1157 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1164 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1166 defp exclude_invisible_actors(query, _opts) do
1168 User.Query.build(%{invisible: true, select: [:ap_id]})
1170 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1172 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1175 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1176 from(activity in query, where: activity.id != ^id)
1179 defp exclude_id(query, _), do: query
1181 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1183 defp maybe_preload_objects(query, _) do
1185 |> Activity.with_preloaded_object()
1188 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1190 defp maybe_preload_bookmarks(query, opts) do
1192 |> Activity.with_preloaded_bookmark(opts[:user])
1195 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1197 |> Activity.with_preloaded_report_notes()
1200 defp maybe_preload_report_notes(query, _), do: query
1202 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1204 defp maybe_set_thread_muted_field(query, opts) do
1206 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1209 defp maybe_order(query, %{order: :desc}) do
1211 |> order_by(desc: :id)
1214 defp maybe_order(query, %{order: :asc}) do
1216 |> order_by(asc: :id)
1219 defp maybe_order(query, _), do: query
1221 defp normalize_fetch_activities_query_opts(opts) do
1222 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1224 value when is_bitstring(value) ->
1225 Map.put(opts, key, Hashtag.normalize_name(value))
1227 value when is_list(value) ->
1230 |> Enum.map(&Hashtag.normalize_name/1)
1233 Map.put(opts, key, normalized_value)
1241 defp fetch_activities_query_ap_ids_ops(opts) do
1242 source_user = opts[:muting_user]
1243 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1245 ap_id_relationships =
1246 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1247 [:block | ap_id_relationships]
1252 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1254 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1255 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1257 restrict_muted_reblogs_opts =
1258 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1260 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1263 def fetch_activities_query(recipients, opts \\ %{}) do
1264 opts = normalize_fetch_activities_query_opts(opts)
1266 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1267 fetch_activities_query_ap_ids_ops(opts)
1270 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1275 |> maybe_preload_objects(opts)
1276 |> maybe_preload_bookmarks(opts)
1277 |> maybe_preload_report_notes(opts)
1278 |> maybe_set_thread_muted_field(opts)
1279 |> maybe_order(opts)
1280 |> restrict_recipients(recipients, opts[:user])
1281 |> restrict_replies(opts)
1282 |> restrict_since(opts)
1283 |> restrict_local(opts)
1284 |> restrict_remote(opts)
1285 |> restrict_actor(opts)
1286 |> restrict_type(opts)
1287 |> restrict_state(opts)
1288 |> restrict_favorited_by(opts)
1289 |> restrict_blocked(restrict_blocked_opts)
1290 |> restrict_muted(restrict_muted_opts)
1291 |> restrict_filtered(opts)
1292 |> restrict_media(opts)
1293 |> restrict_visibility(opts)
1294 |> restrict_thread_visibility(opts, config)
1295 |> restrict_reblogs(opts)
1296 |> restrict_pinned(opts)
1297 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1298 |> restrict_instance(opts)
1299 |> restrict_announce_object_actor(opts)
1300 |> restrict_filtered(opts)
1301 |> Activity.restrict_deactivated_users()
1302 |> exclude_poll_votes(opts)
1303 |> exclude_chat_messages(opts)
1304 |> exclude_invisible_actors(opts)
1305 |> exclude_visibility(opts)
1307 if Config.feature_enabled?(:improved_hashtag_timeline) do
1309 |> restrict_hashtag_any(opts)
1310 |> restrict_hashtag_all(opts)
1311 |> restrict_hashtag_reject_any(opts)
1314 |> restrict_embedded_tag_any(opts)
1315 |> restrict_embedded_tag_all(opts)
1316 |> restrict_embedded_tag_reject_any(opts)
1321 Fetch favorites activities of user with order by sort adds to favorites
1323 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1324 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1326 |> Activity.Queries.by_actor()
1327 |> Activity.Queries.by_type("Like")
1328 |> Activity.with_joined_object()
1329 |> Object.with_joined_activity()
1330 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1331 |> order_by([like, _, _], desc_nulls_last: like.id)
1332 |> Pagination.fetch_paginated(
1333 Map.merge(params, %{skip_order: true}),
1338 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1339 Enum.map(activities, fn
1340 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1341 if Enum.any?(bcc, &(&1 in list_memberships)) do
1342 update_in(activity.data["cc"], &[user_ap_id | &1])
1352 defp maybe_update_cc(activities, _, _), do: activities
1354 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1355 from(activity in query,
1357 fragment("? && ?", activity.recipients, ^recipients) or
1358 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1359 ^Constants.as_public() in activity.recipients)
1363 def fetch_activities_bounded(
1365 recipients_with_public,
1367 pagination \\ :keyset
1369 fetch_activities_query([], opts)
1370 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1371 |> Pagination.fetch_paginated(opts, pagination)
1375 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1376 def upload(file, opts \\ []) do
1377 with {:ok, data} <- Upload.store(file, opts) do
1378 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1380 Repo.insert(%Object{data: obj_data})
1384 @spec get_actor_url(any()) :: binary() | nil
1385 defp get_actor_url(url) when is_binary(url), do: url
1386 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1388 defp get_actor_url(url) when is_list(url) do
1394 defp get_actor_url(_url), do: nil
1396 defp normalize_image(%{"url" => url}) do
1399 "url" => [%{"href" => url}]
1403 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1404 defp normalize_image(_), do: nil
1406 defp object_to_user_data(data) do
1409 |> Map.get("attachment", [])
1410 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1411 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1415 |> Map.get("tag", [])
1417 %{"type" => "Emoji"} -> true
1420 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1421 {String.trim(name, ":"), url}
1424 is_locked = data["manuallyApprovesFollowers"] || false
1425 capabilities = data["capabilities"] || %{}
1426 accepts_chat_messages = capabilities["acceptsChatMessages"]
1427 data = Transmogrifier.maybe_fix_user_object(data)
1428 is_discoverable = data["discoverable"] || false
1429 invisible = data["invisible"] || false
1430 actor_type = data["type"] || "Person"
1432 featured_address = data["featured"]
1433 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1436 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1437 data["publicKey"]["publicKeyPem"]
1443 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1444 data["endpoints"]["sharedInbox"]
1451 uri: get_actor_url(data["url"]),
1453 banner: normalize_image(data["image"]),
1456 is_locked: is_locked,
1457 is_discoverable: is_discoverable,
1458 invisible: invisible,
1459 avatar: normalize_image(data["icon"]),
1461 follower_address: data["followers"],
1462 following_address: data["following"],
1463 featured_address: featured_address,
1464 bio: data["summary"] || "",
1465 actor_type: actor_type,
1466 also_known_as: Map.get(data, "alsoKnownAs", []),
1467 public_key: public_key,
1468 inbox: data["inbox"],
1469 shared_inbox: shared_inbox,
1470 accepts_chat_messages: accepts_chat_messages,
1471 pinned_objects: pinned_objects
1474 # nickname can be nil because of virtual actors
1475 if data["preferredUsername"] do
1479 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1482 Map.put(user_data, :nickname, nil)
1486 def fetch_follow_information_for_user(user) do
1487 with {:ok, following_data} <-
1488 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1489 {:ok, hide_follows} <- collection_private(following_data),
1490 {:ok, followers_data} <-
1491 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1492 {:ok, hide_followers} <- collection_private(followers_data) do
1495 hide_follows: hide_follows,
1496 follower_count: normalize_counter(followers_data["totalItems"]),
1497 following_count: normalize_counter(following_data["totalItems"]),
1498 hide_followers: hide_followers
1501 {:error, _} = e -> e
1506 defp normalize_counter(counter) when is_integer(counter), do: counter
1507 defp normalize_counter(_), do: 0
1509 def maybe_update_follow_information(user_data) do
1510 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1511 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1513 {:collections_available,
1514 !!(user_data[:following_address] && user_data[:follower_address])},
1516 fetch_follow_information_for_user(user_data) do
1517 info = Map.merge(user_data[:info] || %{}, info)
1520 |> Map.put(:info, info)
1522 {:user_type_check, false} ->
1525 {:collections_available, false} ->
1528 {:enabled, false} ->
1533 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1540 defp collection_private(%{"first" => %{"type" => type}})
1541 when type in ["CollectionPage", "OrderedCollectionPage"],
1544 defp collection_private(%{"first" => first}) do
1545 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1546 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1549 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1550 {:error, _} = e -> e
1555 defp collection_private(_data), do: {:ok, true}
1557 def user_data_from_user_object(data) do
1558 with {:ok, data} <- MRF.filter(data) do
1559 {:ok, object_to_user_data(data)}
1565 def fetch_and_prepare_user_from_ap_id(ap_id) do
1566 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1567 {:ok, data} <- user_data_from_user_object(data) do
1568 {:ok, maybe_update_follow_information(data)}
1570 # If this has been deleted, only log a debug and not an error
1571 {:error, "Object has been deleted" = e} ->
1572 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1575 {:error, {:reject, reason} = e} ->
1576 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1580 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1585 def maybe_handle_clashing_nickname(data) do
1586 with nickname when is_binary(nickname) <- data[:nickname],
1587 %User{} = old_user <- User.get_by_nickname(nickname),
1588 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1590 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1596 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1597 |> User.update_and_set_cache()
1599 {:ap_id_comparison, true} ->
1601 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1609 def pin_data_from_featured_collection(%{
1611 "orderedItems" => objects
1613 when type in ["OrderedCollection", "Collection"] do
1614 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1617 def fetch_and_prepare_featured_from_ap_id(nil) do
1621 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1622 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1623 {:ok, pin_data_from_featured_collection(data)}
1626 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1631 def pinned_fetch_task(nil), do: nil
1633 def pinned_fetch_task(%{pinned_objects: pins}) do
1634 if Enum.all?(pins, fn {ap_id, _} ->
1635 Object.get_cached_by_ap_id(ap_id) ||
1636 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1644 def make_user_from_ap_id(ap_id) do
1645 user = User.get_cached_by_ap_id(ap_id)
1647 if user && !User.ap_enabled?(user) do
1648 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1650 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1651 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1655 |> User.remote_user_changeset(data)
1656 |> User.update_and_set_cache()
1658 maybe_handle_clashing_nickname(data)
1661 |> User.remote_user_changeset()
1669 def make_user_from_nickname(nickname) do
1670 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1671 make_user_from_ap_id(ap_id)
1673 _e -> {:error, "No AP id in WebFinger"}
1677 # filter out broken threads
1678 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1679 entire_thread_visible_for_user?(activity, user)
1682 # do post-processing on a specific activity
1683 def contain_activity(%Activity{} = activity, %User{} = user) do
1684 contain_broken_threads(activity, user)
1687 def fetch_direct_messages_query do
1689 |> restrict_type(%{type: "Create"})
1690 |> restrict_visibility(%{visibility: "direct"})
1691 |> order_by([activity], asc: activity.id)