1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
56 defp check_actor_is_active(nil), do: true
58 defp check_actor_is_active(actor) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
65 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
66 limit = Config.get([:instance, :remote_limit])
67 String.length(content) <= limit
70 defp check_remote_limit(_), do: true
72 def increase_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
76 def decrease_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
80 defp increase_replies_count_if_reply(%{
81 "object" => %{"inReplyTo" => reply_ap_id} = object,
84 if is_public?(object) do
85 Object.increase_replies_count(reply_ap_id)
89 defp increase_replies_count_if_reply(_create_data), do: :noop
91 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
93 def persist(%{"type" => type} = object, meta) when type in @object_types do
94 with {:ok, object} <- Object.create(object) do
100 def persist(object, meta) do
101 with local <- Keyword.fetch!(meta, :local),
102 {recipients, _, _} <- get_recipients(object),
104 Repo.insert(%Activity{
107 recipients: recipients,
108 actor: object["actor"]
110 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
111 {:ok, _} <- maybe_create_activity_expiration(activity) do
112 {:ok, activity, meta}
116 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
117 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
118 with nil <- Activity.normalize(map),
119 map <- lazy_put_activity_defaults(map, fake),
120 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
121 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
122 {:ok, map} <- MRF.filter(map),
123 {recipients, _, _} = get_recipients(map),
124 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
125 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
126 {:ok, map, object} <- insert_full_object(map),
127 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
128 # Splice in the child object if we have one.
129 activity = Maps.put_if_present(activity, :object, object)
131 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
132 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
137 %Activity{} = activity ->
143 {:containment, _} = error ->
146 {:error, _} = error ->
149 {:fake, true, map, recipients} ->
150 activity = %Activity{
154 recipients: recipients,
158 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:remote_limit_pass, _} ->
162 {:error, :remote_limit}
169 defp insert_activity_with_expiration(data, local, recipients) do
173 actor: data["actor"],
174 recipients: recipients
177 with {:ok, activity} <- Repo.insert(struct) do
178 maybe_create_activity_expiration(activity)
182 def notify_and_stream(activity) do
183 Notification.create_notifications(activity)
185 conversation = create_or_bump_conversation(activity, activity.actor)
186 participations = get_participations(conversation)
188 stream_out_participations(participations)
191 defp maybe_create_activity_expiration(
192 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
195 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
196 activity_id: activity.id,
197 expires_at: expires_at
203 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
205 defp create_or_bump_conversation(activity, actor) do
206 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
207 %User{} = user <- User.get_cached_by_ap_id(actor) do
208 Participation.mark_as_read(user, conversation)
213 defp get_participations({:ok, conversation}) do
215 |> Repo.preload(:participations, force: true)
216 |> Map.get(:participations)
219 defp get_participations(_), do: []
221 def stream_out_participations(participations) do
224 |> Repo.preload(:user)
226 Streamer.stream("participation", participations)
230 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
231 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
232 conversation = Repo.preload(conversation, :participations)
235 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
240 if last_activity_id do
241 stream_out_participations(conversation.participations)
247 def stream_out_participations(_, _), do: :noop
250 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
251 when data_type in ["Create", "Announce", "Delete"] do
253 |> Topics.get_activity_topics()
254 |> Streamer.stream(activity)
258 def stream_out(_activity) do
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
278 %{to: to, actor: actor, published: published, context: context, object: object},
282 with {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 _ <- notify_and_stream(activity),
288 :ok <- maybe_federate(activity) do
291 {:quick_insert, true, activity} ->
294 {:fake, true, activity} ->
298 Repo.rollback(message)
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
311 %{to: to, actor: actor, published: published, context: context, object: object},
315 with {:ok, activity} <- insert(listen_data, local),
316 _ <- notify_and_stream(activity),
317 :ok <- maybe_federate(activity) do
322 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
323 {:ok, Activity.t()} | nil | {:error, any()}
324 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
325 with {:ok, result} <-
326 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
331 defp do_unfollow(follower, followed, activity_id, local) do
332 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
333 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
334 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
335 {:ok, activity} <- insert(unfollow_data, local),
336 _ <- notify_and_stream(activity),
337 :ok <- maybe_federate(activity) do
341 {:error, error} -> Repo.rollback(error)
345 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
347 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
361 # only accept false as false value
362 local = !(params[:local] == false)
363 forward = !(params[:forward] == false)
365 additional = params[:additional] || %{}
369 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
371 Map.merge(additional, %{"to" => [], "cc" => []})
374 with flag_data <- make_flag_data(params, additional),
375 {:ok, activity} <- insert(flag_data, local),
376 {:ok, stripped_activity} <- strip_report_status_data(activity),
377 _ <- notify_and_stream(activity),
379 maybe_federate(stripped_activity) do
380 User.all_superusers()
381 |> Enum.filter(fn user -> user.ap_id != actor end)
382 |> Enum.filter(fn user -> not is_nil(user.email) end)
383 |> Enum.each(fn superuser ->
385 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
386 |> Pleroma.Emails.Mailer.deliver_async()
391 {:error, error} -> Repo.rollback(error)
395 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
396 def move(%User{} = origin, %User{} = target, local \\ true) do
399 "actor" => origin.ap_id,
400 "object" => origin.ap_id,
401 "target" => target.ap_id
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_recipients(recipients, opts[:user])
435 |> restrict_filtered(opts)
439 "?->>'type' = ? and ?->>'context' = ?",
446 |> exclude_poll_votes(opts)
448 |> order_by([activity], desc: activity.id)
451 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
452 def fetch_activities_for_context(context, opts \\ %{}) do
454 |> fetch_activities_for_context_query(opts)
458 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
459 FlakeId.Ecto.CompatType.t() | nil
460 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
462 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
463 |> restrict_visibility(%{visibility: "direct"})
469 defp fetch_paginated_optimized(query, opts, pagination) do
470 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
471 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
472 opts = Map.put(opts, :skip_extra_order, true)
474 Pagination.fetch_paginated(query, opts, pagination)
477 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
478 list_memberships = Pleroma.List.memberships(opts[:user])
480 fetch_activities_query(recipients ++ list_memberships, opts)
481 |> fetch_paginated_optimized(opts, pagination)
483 |> maybe_update_cc(list_memberships, opts[:user])
486 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
487 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
488 opts = Map.delete(opts, :user)
490 [Constants.as_public()]
491 |> fetch_activities_query(opts)
492 |> restrict_unlisted(opts)
493 |> fetch_paginated_optimized(opts, pagination)
496 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
497 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
499 |> Map.put(:restrict_unlisted, true)
500 |> fetch_public_or_unlisted_activities(pagination)
503 @valid_visibilities ~w[direct unlisted public private]
505 defp restrict_visibility(query, %{visibility: visibility})
506 when is_list(visibility) do
507 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
512 "activity_visibility(?, ?, ?) = ANY (?)",
520 Logger.error("Could not restrict visibility to #{visibility}")
524 defp restrict_visibility(query, %{visibility: visibility})
525 when visibility in @valid_visibilities do
529 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
533 defp restrict_visibility(_query, %{visibility: visibility})
534 when visibility not in @valid_visibilities do
535 Logger.error("Could not restrict visibility to #{visibility}")
538 defp restrict_visibility(query, _visibility), do: query
540 defp exclude_visibility(query, %{exclude_visibilities: visibility})
541 when is_list(visibility) do
542 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
547 "activity_visibility(?, ?, ?) = ANY (?)",
555 Logger.error("Could not exclude visibility to #{visibility}")
560 defp exclude_visibility(query, %{exclude_visibilities: visibility})
561 when visibility in @valid_visibilities do
566 "activity_visibility(?, ?, ?) = ?",
575 defp exclude_visibility(query, %{exclude_visibilities: visibility})
576 when visibility not in [nil | @valid_visibilities] do
577 Logger.error("Could not exclude visibility to #{visibility}")
581 defp exclude_visibility(query, _visibility), do: query
583 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
586 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
589 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
592 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
596 defp restrict_thread_visibility(query, _, _), do: query
598 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
601 |> Map.put(:user, reading_user)
602 |> Map.put(:actor_id, user.ap_id)
605 godmode: params[:godmode],
606 reading_user: reading_user
608 |> user_activities_recipients()
609 |> fetch_activities(params)
613 def fetch_user_activities(user, reading_user, params \\ %{})
615 def fetch_user_activities(user, reading_user, %{total: true} = params) do
616 result = fetch_activities_for_user(user, reading_user, params)
618 Keyword.put(result, :items, Enum.reverse(result[:items]))
621 def fetch_user_activities(user, reading_user, params) do
623 |> fetch_activities_for_user(reading_user, params)
627 defp fetch_activities_for_user(user, reading_user, params) do
630 |> Map.put(:type, ["Create", "Announce"])
631 |> Map.put(:user, reading_user)
632 |> Map.put(:actor_id, user.ap_id)
633 |> Map.put(:pinned_activity_ids, user.pinned_activities)
636 if User.blocks?(reading_user, user) do
640 |> Map.put(:blocking_user, reading_user)
641 |> Map.put(:muting_user, reading_user)
644 pagination_type = Map.get(params, :pagination_type) || :keyset
647 godmode: params[:godmode],
648 reading_user: reading_user
650 |> user_activities_recipients()
651 |> fetch_activities(params, pagination_type)
654 def fetch_statuses(reading_user, %{total: true} = params) do
655 result = fetch_activities_for_reading_user(reading_user, params)
656 Keyword.put(result, :items, Enum.reverse(result[:items]))
659 def fetch_statuses(reading_user, params) do
661 |> fetch_activities_for_reading_user(params)
665 defp fetch_activities_for_reading_user(reading_user, params) do
666 params = Map.put(params, :type, ["Create", "Announce"])
669 godmode: params[:godmode],
670 reading_user: reading_user
672 |> user_activities_recipients()
673 |> fetch_activities(params, :offset)
676 defp user_activities_recipients(%{godmode: true}), do: []
678 defp user_activities_recipients(%{reading_user: reading_user}) do
680 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
682 [Constants.as_public()]
686 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
690 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
692 [activity, object] in query,
695 "?->>'type' != ? or ?->>'actor' != ?",
704 defp restrict_announce_object_actor(query, _), do: query
706 defp restrict_since(query, %{since_id: ""}), do: query
708 defp restrict_since(query, %{since_id: since_id}) do
709 from(activity in query, where: activity.id > ^since_id)
712 defp restrict_since(query, _), do: query
714 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
715 raise_on_missing_preload()
718 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
720 [_activity, object] in query,
721 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
725 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
726 restrict_embedded_tag_any(query, %{tag: tag})
729 defp restrict_embedded_tag_all(query, _), do: query
731 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
732 raise_on_missing_preload()
735 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
737 [_activity, object] in query,
738 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
742 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
743 restrict_embedded_tag_any(query, %{tag: [tag]})
746 defp restrict_embedded_tag_any(query, _), do: query
748 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
749 raise_on_missing_preload()
752 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
754 [_activity, object] in query,
755 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
759 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
760 when is_binary(tag_reject) do
761 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
764 defp restrict_embedded_tag_reject_any(query, _), do: query
766 defp object_ids_query_for_tags(tags) do
767 from(hto in "hashtags_objects")
768 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
769 |> where([hto, ht], ht.name in ^tags)
770 |> select([hto], hto.object_id)
771 |> distinct([hto], true)
774 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
775 raise_on_missing_preload()
778 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
779 restrict_hashtag_any(query, %{tag: single_tag})
782 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
784 [_activity, object] in query,
788 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
789 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
790 AND hashtags_objects.object_id = ?) @> ?
799 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
800 restrict_hashtag_all(query, %{tag_all: [tag]})
803 defp restrict_hashtag_all(query, _), do: query
805 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
806 raise_on_missing_preload()
809 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
811 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
814 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
816 [_activity, object] in query,
817 join: hto in "hashtags_objects",
818 on: hto.object_id == object.id,
819 where: hto.hashtag_id in ^hashtag_ids,
820 distinct: [desc: object.id],
821 order_by: [desc: object.id]
825 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
826 restrict_hashtag_any(query, %{tag: [tag]})
829 defp restrict_hashtag_any(query, _), do: query
831 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
832 raise_on_missing_preload()
835 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
837 [_activity, object] in query,
838 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
842 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
843 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
846 defp restrict_hashtag_reject_any(query, _), do: query
848 defp raise_on_missing_preload do
849 raise "Can't use the child object without preloading!"
852 defp restrict_recipients(query, [], _user), do: query
854 defp restrict_recipients(query, recipients, nil) do
855 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
858 defp restrict_recipients(query, recipients, user) do
861 where: fragment("? && ?", ^recipients, activity.recipients),
862 or_where: activity.actor == ^user.ap_id
866 defp restrict_local(query, %{local_only: true}) do
867 from(activity in query, where: activity.local == true)
870 defp restrict_local(query, _), do: query
872 defp restrict_remote(query, %{remote: true}) do
873 from(activity in query, where: activity.local == false)
876 defp restrict_remote(query, _), do: query
878 defp restrict_actor(query, %{actor_id: actor_id}) do
879 from(activity in query, where: activity.actor == ^actor_id)
882 defp restrict_actor(query, _), do: query
884 defp restrict_type(query, %{type: type}) when is_binary(type) do
885 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
888 defp restrict_type(query, %{type: type}) do
889 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
892 defp restrict_type(query, _), do: query
894 defp restrict_state(query, %{state: state}) do
895 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
898 defp restrict_state(query, _), do: query
900 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
902 [_activity, object] in query,
903 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
907 defp restrict_favorited_by(query, _), do: query
909 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
910 raise "Can't use the child object without preloading!"
913 defp restrict_media(query, %{only_media: true}) do
915 [activity, object] in query,
916 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
917 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
921 defp restrict_media(query, _), do: query
923 defp restrict_replies(query, %{exclude_replies: true}) do
925 [_activity, object] in query,
926 where: fragment("?->>'inReplyTo' is null", object.data)
930 defp restrict_replies(query, %{
931 reply_filtering_user: %User{} = user,
932 reply_visibility: "self"
935 [activity, object] in query,
938 "?->>'inReplyTo' is null OR ? = ANY(?)",
946 defp restrict_replies(query, %{
947 reply_filtering_user: %User{} = user,
948 reply_visibility: "following"
951 [activity, object] in query,
955 ?->>'type' != 'Create' -- This isn't a Create
956 OR ?->>'inReplyTo' is null -- this isn't a reply
957 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
958 -- unless they are the author (because authors
959 -- are also part of the recipients). This leads
960 -- to a bug that self-replies by friends won't
962 OR ? = ? -- The actor is us
966 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
975 defp restrict_replies(query, _), do: query
977 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
978 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
981 defp restrict_reblogs(query, _), do: query
983 defp restrict_muted(query, %{with_muted: true}), do: query
985 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
986 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
989 from([activity] in query,
990 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
993 "not (?->'to' \\?| ?) or ? = ?",
1001 unless opts[:skip_preload] do
1002 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1008 defp restrict_muted(query, _), do: query
1010 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1011 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1012 domain_blocks = user.domain_blocks || []
1014 following_ap_ids = User.get_friends_ap_ids(user)
1017 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1020 [activity, object: o] in query,
1021 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1024 "((not (? && ?)) or ? = ?)",
1025 activity.recipients,
1032 "recipients_contain_blocked_domains(?, ?) = false",
1033 activity.recipients,
1038 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1045 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1053 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1062 defp restrict_blocked(query, _), do: query
1064 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1069 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1071 ^[Constants.as_public()]
1076 defp restrict_unlisted(query, _), do: query
1078 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1079 from(activity in query, where: activity.id in ^ids)
1082 defp restrict_pinned(query, _), do: query
1084 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1085 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1091 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1099 defp restrict_muted_reblogs(query, _), do: query
1101 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1104 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1108 defp restrict_instance(query, _), do: query
1110 defp restrict_filtered(query, %{user: %User{} = user}) do
1111 case Filter.compose_regex(user) do
1116 from([activity, object] in query,
1118 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1119 activity.actor == ^user.ap_id
1124 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1125 restrict_filtered(query, %{user: user})
1128 defp restrict_filtered(query, _), do: query
1130 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1132 defp exclude_poll_votes(query, _) do
1133 if has_named_binding?(query, :object) do
1134 from([activity, object: o] in query,
1135 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1142 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1144 defp exclude_chat_messages(query, _) do
1145 if has_named_binding?(query, :object) do
1146 from([activity, object: o] in query,
1147 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1154 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1156 defp exclude_invisible_actors(query, _opts) do
1158 User.Query.build(%{invisible: true, select: [:ap_id]})
1160 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1162 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1165 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1166 from(activity in query, where: activity.id != ^id)
1169 defp exclude_id(query, _), do: query
1171 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1173 defp maybe_preload_objects(query, _) do
1175 |> Activity.with_preloaded_object()
1178 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1180 defp maybe_preload_bookmarks(query, opts) do
1182 |> Activity.with_preloaded_bookmark(opts[:user])
1185 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1187 |> Activity.with_preloaded_report_notes()
1190 defp maybe_preload_report_notes(query, _), do: query
1192 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1194 defp maybe_set_thread_muted_field(query, opts) do
1196 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1199 defp maybe_order(query, %{order: :desc}) do
1201 |> order_by(desc: :id)
1204 defp maybe_order(query, %{order: :asc}) do
1206 |> order_by(asc: :id)
1209 defp maybe_order(query, _), do: query
1211 defp normalize_fetch_activities_query_opts(opts) do
1212 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1214 value when is_bitstring(value) ->
1215 Map.put(opts, key, Hashtag.normalize_name(value))
1217 value when is_list(value) ->
1220 |> Enum.map(&Hashtag.normalize_name/1)
1223 Map.put(opts, key, normalized_value)
1231 defp fetch_activities_query_ap_ids_ops(opts) do
1232 source_user = opts[:muting_user]
1233 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1235 ap_id_relationships =
1236 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1237 [:block | ap_id_relationships]
1242 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1244 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1245 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1247 restrict_muted_reblogs_opts =
1248 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1250 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1253 def fetch_activities_query(recipients, opts \\ %{}) do
1254 opts = normalize_fetch_activities_query_opts(opts)
1256 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1257 fetch_activities_query_ap_ids_ops(opts)
1260 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1265 |> maybe_preload_objects(opts)
1266 |> maybe_preload_bookmarks(opts)
1267 |> maybe_preload_report_notes(opts)
1268 |> maybe_set_thread_muted_field(opts)
1269 |> maybe_order(opts)
1270 |> restrict_recipients(recipients, opts[:user])
1271 |> restrict_replies(opts)
1272 |> restrict_since(opts)
1273 |> restrict_local(opts)
1274 |> restrict_remote(opts)
1275 |> restrict_actor(opts)
1276 |> restrict_type(opts)
1277 |> restrict_state(opts)
1278 |> restrict_favorited_by(opts)
1279 |> restrict_blocked(restrict_blocked_opts)
1280 |> restrict_muted(restrict_muted_opts)
1281 |> restrict_filtered(opts)
1282 |> restrict_media(opts)
1283 |> restrict_visibility(opts)
1284 |> restrict_thread_visibility(opts, config)
1285 |> restrict_reblogs(opts)
1286 |> restrict_pinned(opts)
1287 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1288 |> restrict_instance(opts)
1289 |> restrict_announce_object_actor(opts)
1290 |> restrict_filtered(opts)
1291 |> Activity.restrict_deactivated_users()
1292 |> exclude_poll_votes(opts)
1293 |> exclude_chat_messages(opts)
1294 |> exclude_invisible_actors(opts)
1295 |> exclude_visibility(opts)
1297 if Config.feature_enabled?(:improved_hashtag_timeline) do
1299 |> restrict_hashtag_any(opts)
1300 |> restrict_hashtag_all(opts)
1301 |> restrict_hashtag_reject_any(opts)
1304 |> restrict_embedded_tag_any(opts)
1305 |> restrict_embedded_tag_all(opts)
1306 |> restrict_embedded_tag_reject_any(opts)
1311 Fetch favorites activities of user with order by sort adds to favorites
1313 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1314 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1316 |> Activity.Queries.by_actor()
1317 |> Activity.Queries.by_type("Like")
1318 |> Activity.with_joined_object()
1319 |> Object.with_joined_activity()
1320 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1321 |> order_by([like, _, _], desc_nulls_last: like.id)
1322 |> Pagination.fetch_paginated(
1323 Map.merge(params, %{skip_order: true}),
1328 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1329 Enum.map(activities, fn
1330 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1331 if Enum.any?(bcc, &(&1 in list_memberships)) do
1332 update_in(activity.data["cc"], &[user_ap_id | &1])
1342 defp maybe_update_cc(activities, _, _), do: activities
1344 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1345 from(activity in query,
1347 fragment("? && ?", activity.recipients, ^recipients) or
1348 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1349 ^Constants.as_public() in activity.recipients)
1353 def fetch_activities_bounded(
1355 recipients_with_public,
1357 pagination \\ :keyset
1359 fetch_activities_query([], opts)
1360 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1361 |> Pagination.fetch_paginated(opts, pagination)
1365 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1366 def upload(file, opts \\ []) do
1367 with {:ok, data} <- Upload.store(file, opts) do
1368 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1370 Repo.insert(%Object{data: obj_data})
1374 @spec get_actor_url(any()) :: binary() | nil
1375 defp get_actor_url(url) when is_binary(url), do: url
1376 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1378 defp get_actor_url(url) when is_list(url) do
1384 defp get_actor_url(_url), do: nil
1386 defp normalize_image(%{"url" => url}) do
1389 "url" => [%{"href" => url}]
1393 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1394 defp normalize_image(_), do: nil
1396 defp object_to_user_data(data) do
1399 |> Map.get("attachment", [])
1400 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1401 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1405 |> Map.get("tag", [])
1407 %{"type" => "Emoji"} -> true
1410 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1411 {String.trim(name, ":"), url}
1414 is_locked = data["manuallyApprovesFollowers"] || false
1415 capabilities = data["capabilities"] || %{}
1416 accepts_chat_messages = capabilities["acceptsChatMessages"]
1417 data = Transmogrifier.maybe_fix_user_object(data)
1418 is_discoverable = data["discoverable"] || false
1419 invisible = data["invisible"] || false
1420 actor_type = data["type"] || "Person"
1423 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1424 data["publicKey"]["publicKeyPem"]
1430 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1431 data["endpoints"]["sharedInbox"]
1438 uri: get_actor_url(data["url"]),
1440 banner: normalize_image(data["image"]),
1443 is_locked: is_locked,
1444 is_discoverable: is_discoverable,
1445 invisible: invisible,
1446 avatar: normalize_image(data["icon"]),
1448 follower_address: data["followers"],
1449 following_address: data["following"],
1450 bio: data["summary"] || "",
1451 actor_type: actor_type,
1452 also_known_as: Map.get(data, "alsoKnownAs", []),
1453 public_key: public_key,
1454 inbox: data["inbox"],
1455 shared_inbox: shared_inbox,
1456 accepts_chat_messages: accepts_chat_messages
1459 # nickname can be nil because of virtual actors
1460 if data["preferredUsername"] do
1464 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1467 Map.put(user_data, :nickname, nil)
1471 def fetch_follow_information_for_user(user) do
1472 with {:ok, following_data} <-
1473 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1474 {:ok, hide_follows} <- collection_private(following_data),
1475 {:ok, followers_data} <-
1476 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1477 {:ok, hide_followers} <- collection_private(followers_data) do
1480 hide_follows: hide_follows,
1481 follower_count: normalize_counter(followers_data["totalItems"]),
1482 following_count: normalize_counter(following_data["totalItems"]),
1483 hide_followers: hide_followers
1486 {:error, _} = e -> e
1491 defp normalize_counter(counter) when is_integer(counter), do: counter
1492 defp normalize_counter(_), do: 0
1494 def maybe_update_follow_information(user_data) do
1495 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1496 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1498 {:collections_available,
1499 !!(user_data[:following_address] && user_data[:follower_address])},
1501 fetch_follow_information_for_user(user_data) do
1502 info = Map.merge(user_data[:info] || %{}, info)
1505 |> Map.put(:info, info)
1507 {:user_type_check, false} ->
1510 {:collections_available, false} ->
1513 {:enabled, false} ->
1518 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1525 defp collection_private(%{"first" => %{"type" => type}})
1526 when type in ["CollectionPage", "OrderedCollectionPage"],
1529 defp collection_private(%{"first" => first}) do
1530 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1531 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1534 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1535 {:error, _} = e -> e
1540 defp collection_private(_data), do: {:ok, true}
1542 def user_data_from_user_object(data) do
1543 with {:ok, data} <- MRF.filter(data) do
1544 {:ok, object_to_user_data(data)}
1550 def fetch_and_prepare_user_from_ap_id(ap_id) do
1551 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1552 {:ok, data} <- user_data_from_user_object(data) do
1553 {:ok, maybe_update_follow_information(data)}
1555 # If this has been deleted, only log a debug and not an error
1556 {:error, "Object has been deleted" = e} ->
1557 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1560 {:error, {:reject, reason} = e} ->
1561 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1565 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1570 def maybe_handle_clashing_nickname(data) do
1571 with nickname when is_binary(nickname) <- data[:nickname],
1572 %User{} = old_user <- User.get_by_nickname(nickname),
1573 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1575 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1581 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1582 |> User.update_and_set_cache()
1584 {:ap_id_comparison, true} ->
1586 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1594 def make_user_from_ap_id(ap_id) do
1595 user = User.get_cached_by_ap_id(ap_id)
1597 if user && !User.ap_enabled?(user) do
1598 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1600 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1603 |> User.remote_user_changeset(data)
1604 |> User.update_and_set_cache()
1606 maybe_handle_clashing_nickname(data)
1609 |> User.remote_user_changeset()
1617 def make_user_from_nickname(nickname) do
1618 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1619 make_user_from_ap_id(ap_id)
1621 _e -> {:error, "No AP id in WebFinger"}
1625 # filter out broken threads
1626 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1627 entire_thread_visible_for_user?(activity, user)
1630 # do post-processing on a specific activity
1631 def contain_activity(%Activity{} = activity, %User{} = user) do
1632 contain_broken_threads(activity, user)
1635 def fetch_direct_messages_query do
1637 |> restrict_type(%{type: "Create"})
1638 |> restrict_visibility(%{visibility: "direct"})
1639 |> order_by([activity], asc: activity.id)