1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
56 defp check_actor_is_active(nil), do: true
58 defp check_actor_is_active(actor) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
65 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
66 limit = Config.get([:instance, :remote_limit])
67 String.length(content) <= limit
70 defp check_remote_limit(_), do: true
72 def increase_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
76 def decrease_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
80 defp increase_replies_count_if_reply(%{
81 "object" => %{"inReplyTo" => reply_ap_id} = object,
84 if is_public?(object) do
85 Object.increase_replies_count(reply_ap_id)
89 defp increase_replies_count_if_reply(_create_data), do: :noop
91 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
93 def persist(%{"type" => type} = object, meta) when type in @object_types do
94 with {:ok, object} <- Object.create(object) do
100 def persist(object, meta) do
101 with local <- Keyword.fetch!(meta, :local),
102 {recipients, _, _} <- get_recipients(object),
104 Repo.insert(%Activity{
107 recipients: recipients,
108 actor: object["actor"]
110 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
111 {:ok, _} <- maybe_create_activity_expiration(activity) do
112 {:ok, activity, meta}
116 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
117 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
118 with nil <- Activity.normalize(map),
119 map <- lazy_put_activity_defaults(map, fake),
120 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
121 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
122 {:ok, map} <- MRF.filter(map),
123 {recipients, _, _} = get_recipients(map),
124 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
125 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
126 {:ok, map, object} <- insert_full_object(map),
127 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
128 # Splice in the child object if we have one.
129 activity = Maps.put_if_present(activity, :object, object)
131 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
132 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
137 %Activity{} = activity ->
143 {:containment, _} = error ->
146 {:error, _} = error ->
149 {:fake, true, map, recipients} ->
150 activity = %Activity{
154 recipients: recipients,
158 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:remote_limit_pass, _} ->
162 {:error, :remote_limit}
169 defp insert_activity_with_expiration(data, local, recipients) do
173 actor: data["actor"],
174 recipients: recipients
177 with {:ok, activity} <- Repo.insert(struct) do
178 maybe_create_activity_expiration(activity)
182 def notify_and_stream(activity) do
183 Notification.create_notifications(activity)
185 conversation = create_or_bump_conversation(activity, activity.actor)
186 participations = get_participations(conversation)
188 stream_out_participations(participations)
191 defp maybe_create_activity_expiration(
192 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
195 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
196 activity_id: activity.id,
197 expires_at: expires_at
203 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
205 defp create_or_bump_conversation(activity, actor) do
206 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
207 %User{} = user <- User.get_cached_by_ap_id(actor) do
208 Participation.mark_as_read(user, conversation)
213 defp get_participations({:ok, conversation}) do
215 |> Repo.preload(:participations, force: true)
216 |> Map.get(:participations)
219 defp get_participations(_), do: []
221 def stream_out_participations(participations) do
224 |> Repo.preload(:user)
226 Streamer.stream("participation", participations)
230 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
231 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
232 conversation = Repo.preload(conversation, :participations)
235 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
240 if last_activity_id do
241 stream_out_participations(conversation.participations)
247 def stream_out_participations(_, _), do: :noop
250 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
251 when data_type in ["Create", "Announce", "Delete"] do
253 |> Topics.get_activity_topics()
254 |> Streamer.stream(activity)
258 def stream_out(_activity) do
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
278 %{to: to, actor: actor, published: published, context: context, object: object},
282 with {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 _ <- notify_and_stream(activity),
288 :ok <- maybe_federate(activity) do
291 {:quick_insert, true, activity} ->
294 {:fake, true, activity} ->
298 Repo.rollback(message)
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
311 %{to: to, actor: actor, published: published, context: context, object: object},
315 with {:ok, activity} <- insert(listen_data, local),
316 _ <- notify_and_stream(activity),
317 :ok <- maybe_federate(activity) do
322 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
323 {:ok, Activity.t()} | nil | {:error, any()}
324 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
325 with {:ok, result} <-
326 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
331 defp do_unfollow(follower, followed, activity_id, local) do
332 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
333 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
334 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
335 {:ok, activity} <- insert(unfollow_data, local),
336 _ <- notify_and_stream(activity),
337 :ok <- maybe_federate(activity) do
341 {:error, error} -> Repo.rollback(error)
345 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
347 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
361 # only accept false as false value
362 local = !(params[:local] == false)
363 forward = !(params[:forward] == false)
365 additional = params[:additional] || %{}
369 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
371 Map.merge(additional, %{"to" => [], "cc" => []})
374 with flag_data <- make_flag_data(params, additional),
375 {:ok, activity} <- insert(flag_data, local),
376 {:ok, stripped_activity} <- strip_report_status_data(activity),
377 _ <- notify_and_stream(activity),
379 maybe_federate(stripped_activity) do
380 User.all_superusers()
381 |> Enum.filter(fn user -> user.ap_id != actor end)
382 |> Enum.filter(fn user -> not is_nil(user.email) end)
383 |> Enum.each(fn superuser ->
385 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
386 |> Pleroma.Emails.Mailer.deliver_async()
391 {:error, error} -> Repo.rollback(error)
395 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
396 def move(%User{} = origin, %User{} = target, local \\ true) do
399 "actor" => origin.ap_id,
400 "object" => origin.ap_id,
401 "target" => target.ap_id
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_recipients(recipients, opts[:user])
435 |> restrict_filtered(opts)
439 "?->>'type' = ? and ?->>'context' = ?",
446 |> exclude_poll_votes(opts)
448 |> order_by([activity], desc: activity.id)
451 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
452 def fetch_activities_for_context(context, opts \\ %{}) do
454 |> fetch_activities_for_context_query(opts)
458 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
459 FlakeId.Ecto.CompatType.t() | nil
460 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
462 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
463 |> restrict_visibility(%{visibility: "direct"})
469 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
470 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
471 opts = Map.delete(opts, :user)
473 [Constants.as_public()]
474 |> fetch_activities_query(opts)
475 |> restrict_unlisted(opts)
476 |> Pagination.fetch_paginated(opts, pagination)
479 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
480 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
482 |> Map.put(:restrict_unlisted, true)
483 |> fetch_public_or_unlisted_activities(pagination)
486 @valid_visibilities ~w[direct unlisted public private]
488 defp restrict_visibility(query, %{visibility: visibility})
489 when is_list(visibility) do
490 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
495 "activity_visibility(?, ?, ?) = ANY (?)",
503 Logger.error("Could not restrict visibility to #{visibility}")
507 defp restrict_visibility(query, %{visibility: visibility})
508 when visibility in @valid_visibilities do
512 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
516 defp restrict_visibility(_query, %{visibility: visibility})
517 when visibility not in @valid_visibilities do
518 Logger.error("Could not restrict visibility to #{visibility}")
521 defp restrict_visibility(query, _visibility), do: query
523 defp exclude_visibility(query, %{exclude_visibilities: visibility})
524 when is_list(visibility) do
525 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
530 "activity_visibility(?, ?, ?) = ANY (?)",
538 Logger.error("Could not exclude visibility to #{visibility}")
543 defp exclude_visibility(query, %{exclude_visibilities: visibility})
544 when visibility in @valid_visibilities do
549 "activity_visibility(?, ?, ?) = ?",
558 defp exclude_visibility(query, %{exclude_visibilities: visibility})
559 when visibility not in [nil | @valid_visibilities] do
560 Logger.error("Could not exclude visibility to #{visibility}")
564 defp exclude_visibility(query, _visibility), do: query
566 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
569 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
572 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
575 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
579 defp restrict_thread_visibility(query, _, _), do: query
581 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
584 |> Map.put(:user, reading_user)
585 |> Map.put(:actor_id, user.ap_id)
588 godmode: params[:godmode],
589 reading_user: reading_user
591 |> user_activities_recipients()
592 |> fetch_activities(params)
596 def fetch_user_activities(user, reading_user, params \\ %{})
598 def fetch_user_activities(user, reading_user, %{total: true} = params) do
599 result = fetch_activities_for_user(user, reading_user, params)
601 Keyword.put(result, :items, Enum.reverse(result[:items]))
604 def fetch_user_activities(user, reading_user, params) do
606 |> fetch_activities_for_user(reading_user, params)
610 defp fetch_activities_for_user(user, reading_user, params) do
613 |> Map.put(:type, ["Create", "Announce"])
614 |> Map.put(:user, reading_user)
615 |> Map.put(:actor_id, user.ap_id)
616 |> Map.put(:pinned_activity_ids, user.pinned_activities)
619 if User.blocks?(reading_user, user) do
623 |> Map.put(:blocking_user, reading_user)
624 |> Map.put(:muting_user, reading_user)
627 pagination_type = Map.get(params, :pagination_type) || :keyset
630 godmode: params[:godmode],
631 reading_user: reading_user
633 |> user_activities_recipients()
634 |> fetch_activities(params, pagination_type)
637 def fetch_statuses(reading_user, %{total: true} = params) do
638 result = fetch_activities_for_reading_user(reading_user, params)
639 Keyword.put(result, :items, Enum.reverse(result[:items]))
642 def fetch_statuses(reading_user, params) do
644 |> fetch_activities_for_reading_user(params)
648 defp fetch_activities_for_reading_user(reading_user, params) do
649 params = Map.put(params, :type, ["Create", "Announce"])
652 godmode: params[:godmode],
653 reading_user: reading_user
655 |> user_activities_recipients()
656 |> fetch_activities(params, :offset)
659 defp user_activities_recipients(%{godmode: true}), do: []
661 defp user_activities_recipients(%{reading_user: reading_user}) do
663 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
665 [Constants.as_public()]
669 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
670 raise "Can't use the child object without preloading!"
673 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
675 [activity, object] in query,
678 "?->>'type' != ? or ?->>'actor' != ?",
687 defp restrict_announce_object_actor(query, _), do: query
689 defp restrict_since(query, %{since_id: ""}), do: query
691 defp restrict_since(query, %{since_id: since_id}) do
692 from(activity in query, where: activity.id > ^since_id)
695 defp restrict_since(query, _), do: query
697 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
698 raise_on_missing_preload()
701 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
703 [_activity, object] in query,
704 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
708 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
709 restrict_embedded_tag_any(query, %{tag: tag})
712 defp restrict_embedded_tag_all(query, _), do: query
714 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
715 raise_on_missing_preload()
718 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
720 [_activity, object] in query,
721 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
725 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
726 restrict_embedded_tag_any(query, %{tag: [tag]})
729 defp restrict_embedded_tag_any(query, _), do: query
731 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
732 raise_on_missing_preload()
735 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
737 [_activity, object] in query,
738 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
742 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
743 when is_binary(tag_reject) do
744 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
747 defp restrict_embedded_tag_reject_any(query, _), do: query
749 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
750 raise_on_missing_preload()
753 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
754 restrict_hashtag_any(query, %{tag: single_tag})
757 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
759 [_activity, object] in query,
763 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
764 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
765 AND hashtags_objects.object_id = ?) @> ?
774 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
775 restrict_hashtag_all(query, %{tag_all: [tag]})
778 defp restrict_hashtag_all(query, _), do: query
780 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
781 raise_on_missing_preload()
784 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
786 [_activity, object] in query,
790 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
791 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
792 AND hashtags_objects.object_id = ? LIMIT 1)
800 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
801 restrict_hashtag_any(query, %{tag: [tag]})
804 defp restrict_hashtag_any(query, _), do: query
806 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
807 raise_on_missing_preload()
810 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
812 [_activity, object] in query,
816 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
817 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
818 AND hashtags_objects.object_id = ? LIMIT 1)
826 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
827 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
830 defp restrict_hashtag_reject_any(query, _), do: query
832 defp raise_on_missing_preload do
833 raise "Can't use the child object without preloading!"
836 defp restrict_recipients(query, [], _user), do: query
838 defp restrict_recipients(query, recipients, nil) do
839 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
842 defp restrict_recipients(query, recipients, user) do
845 where: fragment("? && ?", ^recipients, activity.recipients),
846 or_where: activity.actor == ^user.ap_id
850 defp restrict_local(query, %{local_only: true}) do
851 from(activity in query, where: activity.local == true)
854 defp restrict_local(query, _), do: query
856 defp restrict_remote(query, %{remote: true}) do
857 from(activity in query, where: activity.local == false)
860 defp restrict_remote(query, _), do: query
862 defp restrict_actor(query, %{actor_id: actor_id}) do
863 from(activity in query, where: activity.actor == ^actor_id)
866 defp restrict_actor(query, _), do: query
868 defp restrict_type(query, %{type: type}) when is_binary(type) do
869 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
872 defp restrict_type(query, %{type: type}) do
873 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
876 defp restrict_type(query, _), do: query
878 defp restrict_state(query, %{state: state}) do
879 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
882 defp restrict_state(query, _), do: query
884 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
886 [_activity, object] in query,
887 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
891 defp restrict_favorited_by(query, _), do: query
893 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
894 raise "Can't use the child object without preloading!"
897 defp restrict_media(query, %{only_media: true}) do
899 [activity, object] in query,
900 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
901 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
905 defp restrict_media(query, _), do: query
907 defp restrict_replies(query, %{exclude_replies: true}) do
909 [_activity, object] in query,
910 where: fragment("?->>'inReplyTo' is null", object.data)
914 defp restrict_replies(query, %{
915 reply_filtering_user: %User{} = user,
916 reply_visibility: "self"
919 [activity, object] in query,
922 "?->>'inReplyTo' is null OR ? = ANY(?)",
930 defp restrict_replies(query, %{
931 reply_filtering_user: %User{} = user,
932 reply_visibility: "following"
935 [activity, object] in query,
939 ?->>'type' != 'Create' -- This isn't a Create
940 OR ?->>'inReplyTo' is null -- this isn't a reply
941 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
942 -- unless they are the author (because authors
943 -- are also part of the recipients). This leads
944 -- to a bug that self-replies by friends won't
946 OR ? = ? -- The actor is us
950 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
959 defp restrict_replies(query, _), do: query
961 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
962 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
965 defp restrict_reblogs(query, _), do: query
967 defp restrict_muted(query, %{with_muted: true}), do: query
969 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
970 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
973 from([activity] in query,
974 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
977 "not (?->'to' \\?| ?) or ? = ?",
985 unless opts[:skip_preload] do
986 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
992 defp restrict_muted(query, _), do: query
994 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
995 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
996 domain_blocks = user.domain_blocks || []
998 following_ap_ids = User.get_friends_ap_ids(user)
1001 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1004 [activity, object: o] in query,
1005 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1008 "((not (? && ?)) or ? = ?)",
1009 activity.recipients,
1016 "recipients_contain_blocked_domains(?, ?) = false",
1017 activity.recipients,
1022 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1029 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1037 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1046 defp restrict_blocked(query, _), do: query
1048 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1053 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1055 ^[Constants.as_public()]
1060 defp restrict_unlisted(query, _), do: query
1062 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1063 from(activity in query, where: activity.id in ^ids)
1066 defp restrict_pinned(query, _), do: query
1068 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1069 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1075 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1083 defp restrict_muted_reblogs(query, _), do: query
1085 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1088 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1092 defp restrict_instance(query, _), do: query
1094 defp restrict_filtered(query, %{user: %User{} = user}) do
1095 case Filter.compose_regex(user) do
1100 from([activity, object] in query,
1102 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1103 activity.actor == ^user.ap_id
1108 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1109 restrict_filtered(query, %{user: user})
1112 defp restrict_filtered(query, _), do: query
1114 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1116 defp exclude_poll_votes(query, _) do
1117 if has_named_binding?(query, :object) do
1118 from([activity, object: o] in query,
1119 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1126 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1128 defp exclude_chat_messages(query, _) do
1129 if has_named_binding?(query, :object) do
1130 from([activity, object: o] in query,
1131 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1138 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1140 defp exclude_invisible_actors(query, _opts) do
1142 User.Query.build(%{invisible: true, select: [:ap_id]})
1144 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1146 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1149 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1150 from(activity in query, where: activity.id != ^id)
1153 defp exclude_id(query, _), do: query
1155 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1157 defp maybe_preload_objects(query, _) do
1159 |> Activity.with_preloaded_object()
1162 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1164 defp maybe_preload_bookmarks(query, opts) do
1166 |> Activity.with_preloaded_bookmark(opts[:user])
1169 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1171 |> Activity.with_preloaded_report_notes()
1174 defp maybe_preload_report_notes(query, _), do: query
1176 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1178 defp maybe_set_thread_muted_field(query, opts) do
1180 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1183 defp maybe_order(query, %{order: :desc}) do
1185 |> order_by(desc: :id)
1188 defp maybe_order(query, %{order: :asc}) do
1190 |> order_by(asc: :id)
1193 defp maybe_order(query, _), do: query
1195 defp normalize_fetch_activities_query_opts(opts) do
1196 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1198 value when is_bitstring(value) ->
1199 Map.put(opts, key, Hashtag.normalize_name(value))
1201 value when is_list(value) ->
1202 Map.put(opts, key, Enum.map(value, &Hashtag.normalize_name/1))
1210 defp fetch_activities_query_ap_ids_ops(opts) do
1211 source_user = opts[:muting_user]
1212 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1214 ap_id_relationships =
1215 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1216 [:block | ap_id_relationships]
1221 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1223 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1224 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1226 restrict_muted_reblogs_opts =
1227 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1229 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1232 def fetch_activities_query(recipients, opts \\ %{}) do
1233 opts = normalize_fetch_activities_query_opts(opts)
1235 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1236 fetch_activities_query_ap_ids_ops(opts)
1239 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1244 |> maybe_preload_objects(opts)
1245 |> maybe_preload_bookmarks(opts)
1246 |> maybe_preload_report_notes(opts)
1247 |> maybe_set_thread_muted_field(opts)
1248 |> maybe_order(opts)
1249 |> restrict_recipients(recipients, opts[:user])
1250 |> restrict_replies(opts)
1251 |> restrict_since(opts)
1252 |> restrict_local(opts)
1253 |> restrict_remote(opts)
1254 |> restrict_actor(opts)
1255 |> restrict_type(opts)
1256 |> restrict_state(opts)
1257 |> restrict_favorited_by(opts)
1258 |> restrict_blocked(restrict_blocked_opts)
1259 |> restrict_muted(restrict_muted_opts)
1260 |> restrict_filtered(opts)
1261 |> restrict_media(opts)
1262 |> restrict_visibility(opts)
1263 |> restrict_thread_visibility(opts, config)
1264 |> restrict_reblogs(opts)
1265 |> restrict_pinned(opts)
1266 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1267 |> restrict_instance(opts)
1268 |> restrict_announce_object_actor(opts)
1269 |> restrict_filtered(opts)
1270 |> Activity.restrict_deactivated_users()
1271 |> exclude_poll_votes(opts)
1272 |> exclude_chat_messages(opts)
1273 |> exclude_invisible_actors(opts)
1274 |> exclude_visibility(opts)
1276 if Config.feature_enabled?(:improved_hashtag_timeline) do
1278 |> restrict_hashtag_any(opts)
1279 |> restrict_hashtag_all(opts)
1280 |> restrict_hashtag_reject_any(opts)
1283 |> restrict_embedded_tag_any(opts)
1284 |> restrict_embedded_tag_all(opts)
1285 |> restrict_embedded_tag_reject_any(opts)
1289 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1290 list_memberships = Pleroma.List.memberships(opts[:user])
1292 fetch_activities_query(recipients ++ list_memberships, opts)
1293 |> Pagination.fetch_paginated(opts, pagination)
1295 |> maybe_update_cc(list_memberships, opts[:user])
1299 Fetch favorites activities of user with order by sort adds to favorites
1301 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1302 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1304 |> Activity.Queries.by_actor()
1305 |> Activity.Queries.by_type("Like")
1306 |> Activity.with_joined_object()
1307 |> Object.with_joined_activity()
1308 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1309 |> order_by([like, _, _], desc_nulls_last: like.id)
1310 |> Pagination.fetch_paginated(
1311 Map.merge(params, %{skip_order: true}),
1316 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1317 Enum.map(activities, fn
1318 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1319 if Enum.any?(bcc, &(&1 in list_memberships)) do
1320 update_in(activity.data["cc"], &[user_ap_id | &1])
1330 defp maybe_update_cc(activities, _, _), do: activities
1332 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1333 from(activity in query,
1335 fragment("? && ?", activity.recipients, ^recipients) or
1336 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1337 ^Constants.as_public() in activity.recipients)
1341 def fetch_activities_bounded(
1343 recipients_with_public,
1345 pagination \\ :keyset
1347 fetch_activities_query([], opts)
1348 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1349 |> Pagination.fetch_paginated(opts, pagination)
1353 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1354 def upload(file, opts \\ []) do
1355 with {:ok, data} <- Upload.store(file, opts) do
1356 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1358 Repo.insert(%Object{data: obj_data})
1362 @spec get_actor_url(any()) :: binary() | nil
1363 defp get_actor_url(url) when is_binary(url), do: url
1364 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1366 defp get_actor_url(url) when is_list(url) do
1372 defp get_actor_url(_url), do: nil
1374 defp object_to_user_data(data) do
1376 data["icon"]["url"] &&
1379 "url" => [%{"href" => data["icon"]["url"]}]
1383 data["image"]["url"] &&
1386 "url" => [%{"href" => data["image"]["url"]}]
1391 |> Map.get("attachment", [])
1392 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1393 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1397 |> Map.get("tag", [])
1399 %{"type" => "Emoji"} -> true
1402 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1403 {String.trim(name, ":"), url}
1406 is_locked = data["manuallyApprovesFollowers"] || false
1407 capabilities = data["capabilities"] || %{}
1408 accepts_chat_messages = capabilities["acceptsChatMessages"]
1409 data = Transmogrifier.maybe_fix_user_object(data)
1410 is_discoverable = data["discoverable"] || false
1411 invisible = data["invisible"] || false
1412 actor_type = data["type"] || "Person"
1415 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1416 data["publicKey"]["publicKeyPem"]
1422 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1423 data["endpoints"]["sharedInbox"]
1430 uri: get_actor_url(data["url"]),
1435 is_locked: is_locked,
1436 is_discoverable: is_discoverable,
1437 invisible: invisible,
1440 follower_address: data["followers"],
1441 following_address: data["following"],
1442 bio: data["summary"] || "",
1443 actor_type: actor_type,
1444 also_known_as: Map.get(data, "alsoKnownAs", []),
1445 public_key: public_key,
1446 inbox: data["inbox"],
1447 shared_inbox: shared_inbox,
1448 accepts_chat_messages: accepts_chat_messages
1451 # nickname can be nil because of virtual actors
1452 if data["preferredUsername"] do
1456 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1459 Map.put(user_data, :nickname, nil)
1463 def fetch_follow_information_for_user(user) do
1464 with {:ok, following_data} <-
1465 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1466 {:ok, hide_follows} <- collection_private(following_data),
1467 {:ok, followers_data} <-
1468 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1469 {:ok, hide_followers} <- collection_private(followers_data) do
1472 hide_follows: hide_follows,
1473 follower_count: normalize_counter(followers_data["totalItems"]),
1474 following_count: normalize_counter(following_data["totalItems"]),
1475 hide_followers: hide_followers
1478 {:error, _} = e -> e
1483 defp normalize_counter(counter) when is_integer(counter), do: counter
1484 defp normalize_counter(_), do: 0
1486 def maybe_update_follow_information(user_data) do
1487 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1488 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1490 {:collections_available,
1491 !!(user_data[:following_address] && user_data[:follower_address])},
1493 fetch_follow_information_for_user(user_data) do
1494 info = Map.merge(user_data[:info] || %{}, info)
1497 |> Map.put(:info, info)
1499 {:user_type_check, false} ->
1502 {:collections_available, false} ->
1505 {:enabled, false} ->
1510 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1517 defp collection_private(%{"first" => %{"type" => type}})
1518 when type in ["CollectionPage", "OrderedCollectionPage"],
1521 defp collection_private(%{"first" => first}) do
1522 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1523 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1526 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1527 {:error, _} = e -> e
1532 defp collection_private(_data), do: {:ok, true}
1534 def user_data_from_user_object(data) do
1535 with {:ok, data} <- MRF.filter(data) do
1536 {:ok, object_to_user_data(data)}
1542 def fetch_and_prepare_user_from_ap_id(ap_id) do
1543 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1544 {:ok, data} <- user_data_from_user_object(data) do
1545 {:ok, maybe_update_follow_information(data)}
1547 # If this has been deleted, only log a debug and not an error
1548 {:error, "Object has been deleted" = e} ->
1549 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1552 {:error, {:reject, reason} = e} ->
1553 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1557 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1562 def maybe_handle_clashing_nickname(data) do
1563 with nickname when is_binary(nickname) <- data[:nickname],
1564 %User{} = old_user <- User.get_by_nickname(nickname),
1565 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1567 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1573 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1574 |> User.update_and_set_cache()
1576 {:ap_id_comparison, true} ->
1578 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1586 def make_user_from_ap_id(ap_id) do
1587 user = User.get_cached_by_ap_id(ap_id)
1589 if user && !User.ap_enabled?(user) do
1590 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1592 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1595 |> User.remote_user_changeset(data)
1596 |> User.update_and_set_cache()
1598 maybe_handle_clashing_nickname(data)
1601 |> User.remote_user_changeset()
1609 def make_user_from_nickname(nickname) do
1610 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1611 make_user_from_ap_id(ap_id)
1613 _e -> {:error, "No AP id in WebFinger"}
1617 # filter out broken threads
1618 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1619 entire_thread_visible_for_user?(activity, user)
1622 # do post-processing on a specific activity
1623 def contain_activity(%Activity{} = activity, %User{} = user) do
1624 contain_broken_threads(activity, user)
1627 def fetch_direct_messages_query do
1629 |> restrict_type(%{type: "Create"})
1630 |> restrict_visibility(%{visibility: "direct"})
1631 |> order_by([activity], asc: activity.id)