1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
32 require Pleroma.Constants
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
46 user -> User.following?(user, actor)
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
88 defp check_remote_limit(_), do: true
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
107 def increase_replies_count_if_reply(_create_data), do: :noop
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
117 def decrease_replies_count_if_reply(_object), do: :noop
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
124 Object.increase_vote_count(reply_ap_id, name, actor)
127 def increase_poll_votes_if_vote(_create_data), do: :noop
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
134 Repo.insert(%Activity{
137 recipients: recipients,
138 actor: object["actor"]
140 {:ok, activity, meta}
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
156 Repo.insert(%Activity{
160 recipients: recipients
163 # Splice in the child object if we have one.
164 activity = Maps.put_if_present(activity, :object, object)
166 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
170 %Activity{} = activity ->
173 {:fake, true, map, recipients} ->
174 activity = %Activity{
178 recipients: recipients,
182 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
190 def notify_and_stream(activity) do
191 Notification.create_notifications(activity)
193 conversation = create_or_bump_conversation(activity, activity.actor)
194 participations = get_participations(conversation)
196 stream_out_participations(participations)
199 defp create_or_bump_conversation(activity, actor) do
200 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
201 %User{} = user <- User.get_cached_by_ap_id(actor),
202 Participation.mark_as_read(user, conversation) do
207 defp get_participations({:ok, conversation}) do
209 |> Repo.preload(:participations, force: true)
210 |> Map.get(:participations)
213 defp get_participations(_), do: []
215 def stream_out_participations(participations) do
218 |> Repo.preload(:user)
220 Streamer.stream("participation", participations)
223 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
224 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
225 conversation = Repo.preload(conversation, :participations),
227 fetch_latest_activity_id_for_context(conversation.ap_id, %{
229 "blocking_user" => user
231 if last_activity_id do
232 stream_out_participations(conversation.participations)
237 def stream_out_participations(_, _), do: :noop
239 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
240 when data_type in ["Create", "Announce", "Delete"] do
242 |> Topics.get_activity_topics()
243 |> Streamer.stream(activity)
246 def stream_out(_activity) do
250 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
251 def create(params, fake \\ false) do
252 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
257 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
258 additional = params[:additional] || %{}
259 # only accept false as false value
260 local = !(params[:local] == false)
261 published = params[:published]
262 quick_insert? = Config.get([:env]) == :benchmark
266 %{to: to, actor: actor, published: published, context: context, object: object},
269 {:ok, activity} <- insert(create_data, local, fake),
270 {:fake, false, activity} <- {:fake, fake, activity},
271 _ <- increase_replies_count_if_reply(create_data),
272 _ <- increase_poll_votes_if_vote(create_data),
273 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
274 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
275 _ <- notify_and_stream(activity),
276 :ok <- maybe_federate(activity) do
279 {:quick_insert, true, activity} ->
282 {:fake, true, activity} ->
286 Repo.rollback(message)
290 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
291 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
292 additional = params[:additional] || %{}
293 # only accept false as false value
294 local = !(params[:local] == false)
295 published = params[:published]
299 %{to: to, actor: actor, published: published, context: context, object: object},
302 {:ok, activity} <- insert(listen_data, local),
303 _ <- notify_and_stream(activity),
304 :ok <- maybe_federate(activity) do
309 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
310 def accept(params) do
311 accept_or_reject("Accept", params)
314 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def reject(params) do
316 accept_or_reject("Reject", params)
319 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
320 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
321 local = Map.get(params, :local, true)
322 activity_id = Map.get(params, :activity_id, nil)
325 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
326 |> Maps.put_if_present("id", activity_id),
327 {:ok, activity} <- insert(data, local),
328 _ <- notify_and_stream(activity),
329 :ok <- maybe_federate(activity) do
334 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
335 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
336 local = !(params[:local] == false)
337 activity_id = params[:activity_id]
346 data <- Maps.put_if_present(data, "id", activity_id),
347 {:ok, activity} <- insert(data, local),
348 _ <- notify_and_stream(activity),
349 :ok <- maybe_federate(activity) do
354 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
355 {:ok, Activity.t()} | {:error, any()}
356 def follow(follower, followed, activity_id \\ nil, local \\ true) do
357 with {:ok, result} <-
358 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
363 defp do_follow(follower, followed, activity_id, local) do
364 with data <- make_follow_data(follower, followed, activity_id),
365 {:ok, activity} <- insert(data, local),
366 _ <- notify_and_stream(activity),
367 :ok <- maybe_federate(activity) do
370 {:error, error} -> Repo.rollback(error)
374 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
375 {:ok, Activity.t()} | nil | {:error, any()}
376 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
377 with {:ok, result} <-
378 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
383 defp do_unfollow(follower, followed, activity_id, local) do
384 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
385 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
386 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
387 {:ok, activity} <- insert(unfollow_data, local),
388 _ <- notify_and_stream(activity),
389 :ok <- maybe_federate(activity) do
393 {:error, error} -> Repo.rollback(error)
397 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
398 {:ok, Activity.t()} | {:error, any()}
399 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
400 with {:ok, result} <-
401 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
406 defp do_block(blocker, blocked, activity_id, local) do
407 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
409 if unfollow_blocked do
410 follow_activity = fetch_latest_follow(blocker, blocked)
411 if follow_activity, do: unfollow(blocker, blocked, nil, local)
414 with block_data <- make_block_data(blocker, blocked, activity_id),
415 {:ok, activity} <- insert(block_data, local),
416 _ <- notify_and_stream(activity),
417 :ok <- maybe_federate(activity) do
420 {:error, error} -> Repo.rollback(error)
424 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
434 # only accept false as false value
435 local = !(params[:local] == false)
436 forward = !(params[:forward] == false)
438 additional = params[:additional] || %{}
442 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
444 Map.merge(additional, %{"to" => [], "cc" => []})
447 with flag_data <- make_flag_data(params, additional),
448 {:ok, activity} <- insert(flag_data, local),
449 {:ok, stripped_activity} <- strip_report_status_data(activity),
450 _ <- notify_and_stream(activity),
451 :ok <- maybe_federate(stripped_activity) do
452 User.all_superusers()
453 |> Enum.filter(fn user -> not is_nil(user.email) end)
454 |> Enum.each(fn superuser ->
456 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
457 |> Pleroma.Emails.Mailer.deliver_async()
464 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
465 def move(%User{} = origin, %User{} = target, local \\ true) do
468 "actor" => origin.ap_id,
469 "object" => origin.ap_id,
470 "target" => target.ap_id
473 with true <- origin.ap_id in target.also_known_as,
474 {:ok, activity} <- insert(params, local),
475 _ <- notify_and_stream(activity) do
476 maybe_federate(activity)
478 BackgroundWorker.enqueue("move_following", %{
479 "origin_id" => origin.id,
480 "target_id" => target.id
485 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
490 def fetch_activities_for_context_query(context, opts) do
491 public = [Constants.as_public()]
495 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
498 from(activity in Activity)
499 |> maybe_preload_objects(opts)
500 |> maybe_preload_bookmarks(opts)
501 |> maybe_set_thread_muted_field(opts)
502 |> restrict_blocked(opts)
503 |> restrict_recipients(recipients, opts["user"])
507 "?->>'type' = ? and ?->>'context' = ?",
514 |> exclude_poll_votes(opts)
516 |> order_by([activity], desc: activity.id)
519 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
520 def fetch_activities_for_context(context, opts \\ %{}) do
522 |> fetch_activities_for_context_query(opts)
526 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
527 FlakeId.Ecto.CompatType.t() | nil
528 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
530 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
536 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
537 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
538 opts = Map.drop(opts, ["user"])
540 query = fetch_activities_query([Constants.as_public()], opts)
543 if opts["restrict_unlisted"] do
544 restrict_unlisted(query)
549 Pagination.fetch_paginated(query, opts, pagination)
552 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
553 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
555 |> Map.put("restrict_unlisted", true)
556 |> fetch_public_or_unlisted_activities(pagination)
559 @valid_visibilities ~w[direct unlisted public private]
561 defp restrict_visibility(query, %{visibility: visibility})
562 when is_list(visibility) do
563 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
569 "activity_visibility(?, ?, ?) = ANY (?)",
579 Logger.error("Could not restrict visibility to #{visibility}")
583 defp restrict_visibility(query, %{visibility: visibility})
584 when visibility in @valid_visibilities do
588 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
592 defp restrict_visibility(_query, %{visibility: visibility})
593 when visibility not in @valid_visibilities do
594 Logger.error("Could not restrict visibility to #{visibility}")
597 defp restrict_visibility(query, _visibility), do: query
599 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
600 when is_list(visibility) do
601 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
606 "activity_visibility(?, ?, ?) = ANY (?)",
614 Logger.error("Could not exclude visibility to #{visibility}")
619 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
620 when visibility in @valid_visibilities do
625 "activity_visibility(?, ?, ?) = ?",
634 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
635 when visibility not in [nil | @valid_visibilities] do
636 Logger.error("Could not exclude visibility to #{visibility}")
640 defp exclude_visibility(query, _visibility), do: query
642 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
645 defp restrict_thread_visibility(
647 %{"user" => %User{skip_thread_containment: true}},
652 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
655 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
659 defp restrict_thread_visibility(query, _, _), do: query
661 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
664 |> Map.put("user", reading_user)
665 |> Map.put("actor_id", user.ap_id)
668 user_activities_recipients(%{
669 "godmode" => params["godmode"],
670 "reading_user" => reading_user
673 fetch_activities(recipients, params)
677 def fetch_user_activities(user, reading_user, params \\ %{}) do
680 |> Map.put("type", ["Create", "Announce"])
681 |> Map.put("user", reading_user)
682 |> Map.put("actor_id", user.ap_id)
683 |> Map.put("pinned_activity_ids", user.pinned_activities)
686 if User.blocks?(reading_user, user) do
690 |> Map.put("blocking_user", reading_user)
691 |> Map.put("muting_user", reading_user)
695 user_activities_recipients(%{
696 "godmode" => params["godmode"],
697 "reading_user" => reading_user
700 fetch_activities(recipients, params)
704 def fetch_statuses(reading_user, params) do
707 |> Map.put("type", ["Create", "Announce"])
710 user_activities_recipients(%{
711 "godmode" => params["godmode"],
712 "reading_user" => reading_user
715 fetch_activities(recipients, params, :offset)
719 defp user_activities_recipients(%{"godmode" => true}) do
723 defp user_activities_recipients(%{"reading_user" => reading_user}) do
725 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
727 [Constants.as_public()]
731 defp restrict_since(query, %{"since_id" => ""}), do: query
733 defp restrict_since(query, %{"since_id" => since_id}) do
734 from(activity in query, where: activity.id > ^since_id)
737 defp restrict_since(query, _), do: query
739 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
740 raise "Can't use the child object without preloading!"
743 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
744 when is_list(tag_reject) and tag_reject != [] do
746 [_activity, object] in query,
747 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
751 defp restrict_tag_reject(query, _), do: query
753 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
754 raise "Can't use the child object without preloading!"
757 defp restrict_tag_all(query, %{"tag_all" => tag_all})
758 when is_list(tag_all) and tag_all != [] do
760 [_activity, object] in query,
761 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
765 defp restrict_tag_all(query, _), do: query
767 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
768 raise "Can't use the child object without preloading!"
771 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
773 [_activity, object] in query,
774 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
778 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
780 [_activity, object] in query,
781 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
785 defp restrict_tag(query, _), do: query
787 defp restrict_recipients(query, [], _user), do: query
789 defp restrict_recipients(query, recipients, nil) do
790 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
793 defp restrict_recipients(query, recipients, user) do
796 where: fragment("? && ?", ^recipients, activity.recipients),
797 or_where: activity.actor == ^user.ap_id
801 defp restrict_local(query, %{"local_only" => true}) do
802 from(activity in query, where: activity.local == true)
805 defp restrict_local(query, _), do: query
807 defp restrict_actor(query, %{"actor_id" => actor_id}) do
808 from(activity in query, where: activity.actor == ^actor_id)
811 defp restrict_actor(query, _), do: query
813 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
814 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
817 defp restrict_type(query, %{"type" => type}) do
818 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
821 defp restrict_type(query, _), do: query
823 defp restrict_state(query, %{"state" => state}) do
824 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
827 defp restrict_state(query, _), do: query
829 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
831 [_activity, object] in query,
832 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
836 defp restrict_favorited_by(query, _), do: query
838 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
839 raise "Can't use the child object without preloading!"
842 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
844 [_activity, object] in query,
845 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
849 defp restrict_media(query, _), do: query
851 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
853 [_activity, object] in query,
854 where: fragment("?->>'inReplyTo' is null", object.data)
858 defp restrict_replies(query, %{
859 "reply_filtering_user" => user,
860 "reply_visibility" => "self"
863 [activity, object] in query,
866 "?->>'inReplyTo' is null OR ? = ANY(?)",
874 defp restrict_replies(query, %{
875 "reply_filtering_user" => user,
876 "reply_visibility" => "following"
879 [activity, object] in query,
882 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
884 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
893 defp restrict_replies(query, _), do: query
895 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
896 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
899 defp restrict_reblogs(query, _), do: query
901 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
903 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
904 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
907 from([activity] in query,
908 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
909 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
912 unless opts["skip_preload"] do
913 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
919 defp restrict_muted(query, _), do: query
921 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
922 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
923 domain_blocks = user.domain_blocks || []
925 following_ap_ids = User.get_friends_ap_ids(user)
928 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
931 [activity, object: o] in query,
932 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
933 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
936 "recipients_contain_blocked_domains(?, ?) = false",
942 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
949 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
957 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
966 defp restrict_blocked(query, _), do: query
968 defp restrict_unlisted(query) do
973 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
975 ^[Constants.as_public()]
980 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
981 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
982 # and `restrict_muted/2`
984 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
985 when pinned in [true, "true", "1"] do
986 from(activity in query, where: activity.id in ^ids)
989 defp restrict_pinned(query, _), do: query
991 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
992 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
998 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1006 defp restrict_muted_reblogs(query, _), do: query
1008 defp restrict_instance(query, %{"instance" => instance}) do
1013 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1017 from(activity in query, where: activity.actor in ^users)
1020 defp restrict_instance(query, _), do: query
1022 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1024 defp exclude_poll_votes(query, _) do
1025 if has_named_binding?(query, :object) do
1026 from([activity, object: o] in query,
1027 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1034 defp exclude_invisible_actors(query, %{"invisible_actors" => true}), do: query
1036 defp exclude_invisible_actors(query, _opts) do
1038 User.Query.build(%{invisible: true, select: [:ap_id]})
1040 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1042 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1045 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1046 from(activity in query, where: activity.id != ^id)
1049 defp exclude_id(query, _), do: query
1051 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1053 defp maybe_preload_objects(query, _) do
1055 |> Activity.with_preloaded_object()
1058 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1060 defp maybe_preload_bookmarks(query, opts) do
1062 |> Activity.with_preloaded_bookmark(opts["user"])
1065 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1067 |> Activity.with_preloaded_report_notes()
1070 defp maybe_preload_report_notes(query, _), do: query
1072 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1074 defp maybe_set_thread_muted_field(query, opts) do
1076 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1079 defp maybe_order(query, %{order: :desc}) do
1081 |> order_by(desc: :id)
1084 defp maybe_order(query, %{order: :asc}) do
1086 |> order_by(asc: :id)
1089 defp maybe_order(query, _), do: query
1091 defp fetch_activities_query_ap_ids_ops(opts) do
1092 source_user = opts["muting_user"]
1093 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1095 ap_id_relationships =
1096 ap_id_relationships ++
1097 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1103 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1105 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1106 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1108 restrict_muted_reblogs_opts =
1109 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1111 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1114 def fetch_activities_query(recipients, opts \\ %{}) do
1115 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1116 fetch_activities_query_ap_ids_ops(opts)
1119 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1123 |> maybe_preload_objects(opts)
1124 |> maybe_preload_bookmarks(opts)
1125 |> maybe_preload_report_notes(opts)
1126 |> maybe_set_thread_muted_field(opts)
1127 |> maybe_order(opts)
1128 |> restrict_recipients(recipients, opts["user"])
1129 |> restrict_replies(opts)
1130 |> restrict_tag(opts)
1131 |> restrict_tag_reject(opts)
1132 |> restrict_tag_all(opts)
1133 |> restrict_since(opts)
1134 |> restrict_local(opts)
1135 |> restrict_actor(opts)
1136 |> restrict_type(opts)
1137 |> restrict_state(opts)
1138 |> restrict_favorited_by(opts)
1139 |> restrict_blocked(restrict_blocked_opts)
1140 |> restrict_muted(restrict_muted_opts)
1141 |> restrict_media(opts)
1142 |> restrict_visibility(opts)
1143 |> restrict_thread_visibility(opts, config)
1144 |> restrict_reblogs(opts)
1145 |> restrict_pinned(opts)
1146 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1147 |> restrict_instance(opts)
1148 |> Activity.restrict_deactivated_users()
1149 |> exclude_poll_votes(opts)
1150 |> exclude_invisible_actors(opts)
1151 |> exclude_visibility(opts)
1154 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1155 list_memberships = Pleroma.List.memberships(opts["user"])
1157 fetch_activities_query(recipients ++ list_memberships, opts)
1158 |> Pagination.fetch_paginated(opts, pagination)
1160 |> maybe_update_cc(list_memberships, opts["user"])
1164 Fetch favorites activities of user with order by sort adds to favorites
1166 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1167 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1169 |> Activity.Queries.by_actor()
1170 |> Activity.Queries.by_type("Like")
1171 |> Activity.with_joined_object()
1172 |> Object.with_joined_activity()
1173 |> select([_like, object, activity], %{activity | object: object})
1174 |> order_by([like, _, _], desc_nulls_last: like.id)
1175 |> Pagination.fetch_paginated(
1176 Map.merge(params, %{"skip_order" => true}),
1182 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1183 when is_list(list_memberships) and length(list_memberships) > 0 do
1184 Enum.map(activities, fn
1185 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1186 if Enum.any?(bcc, &(&1 in list_memberships)) do
1187 update_in(activity.data["cc"], &[user_ap_id | &1])
1197 defp maybe_update_cc(activities, _, _), do: activities
1199 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1200 from(activity in query,
1202 fragment("? && ?", activity.recipients, ^recipients) or
1203 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1204 ^Constants.as_public() in activity.recipients)
1208 def fetch_activities_bounded(
1210 recipients_with_public,
1212 pagination \\ :keyset
1214 fetch_activities_query([], opts)
1215 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1216 |> Pagination.fetch_paginated(opts, pagination)
1220 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1221 def upload(file, opts \\ []) do
1222 with {:ok, data} <- Upload.store(file, opts) do
1223 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1225 Repo.insert(%Object{data: obj_data})
1229 @spec get_actor_url(any()) :: binary() | nil
1230 defp get_actor_url(url) when is_binary(url), do: url
1231 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1233 defp get_actor_url(url) when is_list(url) do
1239 defp get_actor_url(_url), do: nil
1241 defp object_to_user_data(data) do
1243 data["icon"]["url"] &&
1246 "url" => [%{"href" => data["icon"]["url"]}]
1250 data["image"]["url"] &&
1253 "url" => [%{"href" => data["image"]["url"]}]
1258 |> Map.get("attachment", [])
1259 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1260 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1264 |> Map.get("tag", [])
1266 %{"type" => "Emoji"} -> true
1269 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1270 Map.put(acc, String.trim(name, ":"), url)
1273 locked = data["manuallyApprovesFollowers"] || false
1274 data = Transmogrifier.maybe_fix_user_object(data)
1275 discoverable = data["discoverable"] || false
1276 invisible = data["invisible"] || false
1277 actor_type = data["type"] || "Person"
1280 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1281 data["publicKey"]["publicKeyPem"]
1287 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1288 data["endpoints"]["sharedInbox"]
1295 uri: get_actor_url(data["url"]),
1301 discoverable: discoverable,
1302 invisible: invisible,
1305 follower_address: data["followers"],
1306 following_address: data["following"],
1307 bio: data["summary"],
1308 actor_type: actor_type,
1309 also_known_as: Map.get(data, "alsoKnownAs", []),
1310 public_key: public_key,
1311 inbox: data["inbox"],
1312 shared_inbox: shared_inbox
1315 # nickname can be nil because of virtual actors
1317 if data["preferredUsername"] do
1321 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1324 Map.put(user_data, :nickname, nil)
1330 def fetch_follow_information_for_user(user) do
1331 with {:ok, following_data} <-
1332 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1333 {:ok, hide_follows} <- collection_private(following_data),
1334 {:ok, followers_data} <-
1335 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1336 {:ok, hide_followers} <- collection_private(followers_data) do
1339 hide_follows: hide_follows,
1340 follower_count: normalize_counter(followers_data["totalItems"]),
1341 following_count: normalize_counter(following_data["totalItems"]),
1342 hide_followers: hide_followers
1345 {:error, _} = e -> e
1350 defp normalize_counter(counter) when is_integer(counter), do: counter
1351 defp normalize_counter(_), do: 0
1353 def maybe_update_follow_information(user_data) do
1354 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1355 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1357 {:collections_available,
1358 !!(user_data[:following_address] && user_data[:follower_address])},
1360 fetch_follow_information_for_user(user_data) do
1361 info = Map.merge(user_data[:info] || %{}, info)
1364 |> Map.put(:info, info)
1366 {:user_type_check, false} ->
1369 {:collections_available, false} ->
1372 {:enabled, false} ->
1377 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1384 defp collection_private(%{"first" => %{"type" => type}})
1385 when type in ["CollectionPage", "OrderedCollectionPage"],
1388 defp collection_private(%{"first" => first}) do
1389 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1390 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1393 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1394 {:error, _} = e -> e
1399 defp collection_private(_data), do: {:ok, true}
1401 def user_data_from_user_object(data) do
1402 with {:ok, data} <- MRF.filter(data),
1403 {:ok, data} <- object_to_user_data(data) do
1410 def fetch_and_prepare_user_from_ap_id(ap_id) do
1411 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1412 {:ok, data} <- user_data_from_user_object(data),
1413 data <- maybe_update_follow_information(data) do
1416 {:error, "Object has been deleted"} = e ->
1417 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1421 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1426 def make_user_from_ap_id(ap_id) do
1427 user = User.get_cached_by_ap_id(ap_id)
1429 if user && !User.ap_enabled?(user) do
1430 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1432 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1435 |> User.remote_user_changeset(data)
1436 |> User.update_and_set_cache()
1439 |> User.remote_user_changeset()
1449 def make_user_from_nickname(nickname) do
1450 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1451 make_user_from_ap_id(ap_id)
1453 _e -> {:error, "No AP id in WebFinger"}
1457 # filter out broken threads
1458 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1459 entire_thread_visible_for_user?(activity, user)
1462 # do post-processing on a specific activity
1463 def contain_activity(%Activity{} = activity, %User{} = user) do
1464 contain_broken_threads(activity, user)
1467 def fetch_direct_messages_query do
1469 |> restrict_type(%{"type" => "Create"})
1470 |> restrict_visibility(%{visibility: "direct"})
1471 |> order_by([activity], asc: activity.id)