1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
14 alias Pleroma.Object.Containment
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.Pagination
20 alias Pleroma.Web.ActivityPub.MRF
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.Utils
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
32 require Pleroma.Constants
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
46 user -> User.following?(user, actor)
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
88 defp check_remote_limit(_), do: true
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
107 def increase_replies_count_if_reply(_create_data), do: :noop
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
117 def decrease_replies_count_if_reply(_object), do: :noop
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
124 Object.increase_vote_count(reply_ap_id, name, actor)
127 def increase_poll_votes_if_vote(_create_data), do: :noop
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
134 Repo.insert(%Activity{
137 recipients: recipients,
138 actor: object["actor"]
140 {:ok, activity, meta}
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
156 Repo.insert(%Activity{
160 recipients: recipients
163 # Splice in the child object if we have one.
165 if not is_nil(object) do
166 Map.put(activity, :object, object)
171 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
173 Notification.create_notifications(activity)
175 conversation = create_or_bump_conversation(activity, map["actor"])
176 participations = get_participations(conversation)
178 stream_out_participations(participations)
181 %Activity{} = activity ->
184 {:fake, true, map, recipients} ->
185 activity = %Activity{
189 recipients: recipients,
193 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
201 defp create_or_bump_conversation(activity, actor) do
202 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
203 %User{} = user <- User.get_cached_by_ap_id(actor),
204 Participation.mark_as_read(user, conversation) do
209 defp get_participations({:ok, conversation}) do
211 |> Repo.preload(:participations, force: true)
212 |> Map.get(:participations)
215 defp get_participations(_), do: []
217 def stream_out_participations(participations) do
220 |> Repo.preload(:user)
222 Streamer.stream("participation", participations)
225 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
226 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
227 conversation = Repo.preload(conversation, :participations),
229 fetch_latest_activity_id_for_context(conversation.ap_id, %{
231 "blocking_user" => user
233 if last_activity_id do
234 stream_out_participations(conversation.participations)
239 def stream_out_participations(_, _), do: :noop
241 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
242 when data_type in ["Create", "Announce", "Delete"] do
244 |> Topics.get_activity_topics()
245 |> Streamer.stream(activity)
248 def stream_out(_activity) do
252 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
253 def create(params, fake \\ false) do
254 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
259 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
260 additional = params[:additional] || %{}
261 # only accept false as false value
262 local = !(params[:local] == false)
263 published = params[:published]
264 quick_insert? = Config.get([:env]) == :benchmark
268 %{to: to, actor: actor, published: published, context: context, object: object},
271 {:ok, activity} <- insert(create_data, local, fake),
272 {:fake, false, activity} <- {:fake, fake, activity},
273 _ <- increase_replies_count_if_reply(create_data),
274 _ <- increase_poll_votes_if_vote(create_data),
275 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
276 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
277 :ok <- maybe_federate(activity) do
280 {:quick_insert, true, activity} ->
283 {:fake, true, activity} ->
287 Repo.rollback(message)
291 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
292 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
293 additional = params[:additional] || %{}
294 # only accept false as false value
295 local = !(params[:local] == false)
296 published = params[:published]
300 %{to: to, actor: actor, published: published, context: context, object: object},
303 {:ok, activity} <- insert(listen_data, local),
304 :ok <- maybe_federate(activity) do
309 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
310 def accept(params) do
311 accept_or_reject("Accept", params)
314 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def reject(params) do
316 accept_or_reject("Reject", params)
319 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
320 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
321 local = Map.get(params, :local, true)
322 activity_id = Map.get(params, :activity_id, nil)
325 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
326 |> Utils.maybe_put("id", activity_id),
327 {:ok, activity} <- insert(data, local),
328 :ok <- maybe_federate(activity) do
333 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
334 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
335 local = !(params[:local] == false)
336 activity_id = params[:activity_id]
345 data <- Utils.maybe_put(data, "id", activity_id),
346 {:ok, activity} <- insert(data, local),
347 :ok <- maybe_federate(activity) do
352 @spec react_with_emoji(User.t(), Object.t(), String.t(), keyword()) ::
353 {:ok, Activity.t(), Object.t()} | {:error, any()}
354 def react_with_emoji(user, object, emoji, options \\ []) do
355 with {:ok, result} <-
356 Repo.transaction(fn -> do_react_with_emoji(user, object, emoji, options) end) do
361 defp do_react_with_emoji(user, object, emoji, options) do
362 with local <- Keyword.get(options, :local, true),
363 activity_id <- Keyword.get(options, :activity_id, nil),
364 true <- Pleroma.Emoji.is_unicode_emoji?(emoji),
365 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
366 {:ok, activity} <- insert(reaction_data, local),
367 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
368 :ok <- maybe_federate(activity) do
369 {:ok, activity, object}
371 false -> {:error, false}
372 {:error, error} -> Repo.rollback(error)
376 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
377 {:ok, Activity.t(), Object.t()} | {:error, any()}
379 %User{ap_id: _} = user,
380 %Object{data: %{"id" => _}} = object,
385 with {:ok, result} <-
386 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
391 defp do_announce(user, object, activity_id, local, public) do
392 with true <- is_announceable?(object, user, public),
393 object <- Object.get_by_id(object.id),
394 announce_data <- make_announce_data(user, object, activity_id, public),
395 {:ok, activity} <- insert(announce_data, local),
396 {:ok, object} <- add_announce_to_object(activity, object),
397 :ok <- maybe_federate(activity) do
398 {:ok, activity, object}
400 false -> {:error, false}
401 {:error, error} -> Repo.rollback(error)
405 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
406 {:ok, Activity.t()} | {:error, any()}
407 def follow(follower, followed, activity_id \\ nil, local \\ true) do
408 with {:ok, result} <-
409 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
414 defp do_follow(follower, followed, activity_id, local) do
415 with data <- make_follow_data(follower, followed, activity_id),
416 {:ok, activity} <- insert(data, local),
417 :ok <- maybe_federate(activity) do
420 {:error, error} -> Repo.rollback(error)
424 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
425 {:ok, Activity.t()} | nil | {:error, any()}
426 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
427 with {:ok, result} <-
428 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
433 defp do_unfollow(follower, followed, activity_id, local) do
434 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
435 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
436 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
437 {:ok, activity} <- insert(unfollow_data, local),
438 :ok <- maybe_federate(activity) do
442 {:error, error} -> Repo.rollback(error)
446 @spec delete(User.t() | Object.t(), keyword()) :: {:ok, User.t() | Object.t()} | {:error, any()}
447 def delete(entity, options \\ []) do
448 with {:ok, result} <- Repo.transaction(fn -> do_delete(entity, options) end) do
453 defp do_delete(%User{ap_id: ap_id, follower_address: follower_address} = user, _) do
455 "to" => [follower_address],
458 "object" => %{"type" => "Person", "id" => ap_id}
460 {:ok, activity} <- insert(data, true, true, true),
461 :ok <- maybe_federate(activity) do
466 defp do_delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options) do
467 local = Keyword.get(options, :local, true)
468 activity_id = Keyword.get(options, :activity_id, nil)
469 actor = Keyword.get(options, :actor, actor)
471 user = User.get_cached_by_ap_id(actor)
472 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
474 with create_activity <- Activity.get_create_by_object_ap_id(id),
481 "deleted_activity_id" => create_activity && create_activity.id
483 |> maybe_put("id", activity_id),
484 {:ok, activity} <- insert(data, local, false),
485 {:ok, object, _create_activity} <- Object.delete(object),
486 stream_out_participations(object, user),
487 _ <- decrease_replies_count_if_reply(object),
488 {:ok, _actor} <- decrease_note_count_if_public(user, object),
489 :ok <- maybe_federate(activity) do
497 defp do_delete(%Object{data: %{"type" => "Tombstone", "id" => ap_id}}, _) do
500 |> Activity.Queries.by_object_id()
501 |> Activity.Queries.by_type("Delete")
507 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
508 {:ok, Activity.t()} | {:error, any()}
509 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
510 with {:ok, result} <-
511 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
516 defp do_block(blocker, blocked, activity_id, local) do
517 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
518 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
520 if unfollow_blocked do
521 follow_activity = fetch_latest_follow(blocker, blocked)
522 if follow_activity, do: unfollow(blocker, blocked, nil, local)
525 with true <- outgoing_blocks,
526 block_data <- make_block_data(blocker, blocked, activity_id),
527 {:ok, activity} <- insert(block_data, local),
528 :ok <- maybe_federate(activity) do
531 {:error, error} -> Repo.rollback(error)
535 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
545 # only accept false as false value
546 local = !(params[:local] == false)
547 forward = !(params[:forward] == false)
549 additional = params[:additional] || %{}
553 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
555 Map.merge(additional, %{"to" => [], "cc" => []})
558 with flag_data <- make_flag_data(params, additional),
559 {:ok, activity} <- insert(flag_data, local),
560 {:ok, stripped_activity} <- strip_report_status_data(activity),
561 :ok <- maybe_federate(stripped_activity) do
562 User.all_superusers()
563 |> Enum.filter(fn user -> not is_nil(user.email) end)
564 |> Enum.each(fn superuser ->
566 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
567 |> Pleroma.Emails.Mailer.deliver_async()
574 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
575 def move(%User{} = origin, %User{} = target, local \\ true) do
578 "actor" => origin.ap_id,
579 "object" => origin.ap_id,
580 "target" => target.ap_id
583 with true <- origin.ap_id in target.also_known_as,
584 {:ok, activity} <- insert(params, local) do
585 maybe_federate(activity)
587 BackgroundWorker.enqueue("move_following", %{
588 "origin_id" => origin.id,
589 "target_id" => target.id
594 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
599 def fetch_activities_for_context_query(context, opts) do
600 public = [Constants.as_public()]
604 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
607 from(activity in Activity)
608 |> maybe_preload_objects(opts)
609 |> maybe_preload_bookmarks(opts)
610 |> maybe_set_thread_muted_field(opts)
611 |> restrict_blocked(opts)
612 |> restrict_recipients(recipients, opts["user"])
616 "?->>'type' = ? and ?->>'context' = ?",
623 |> exclude_poll_votes(opts)
625 |> order_by([activity], desc: activity.id)
628 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
629 def fetch_activities_for_context(context, opts \\ %{}) do
631 |> fetch_activities_for_context_query(opts)
635 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
636 FlakeId.Ecto.CompatType.t() | nil
637 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
639 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
645 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
646 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
647 opts = Map.drop(opts, ["user"])
649 [Constants.as_public()]
650 |> fetch_activities_query(opts)
651 |> restrict_unlisted()
652 |> Pagination.fetch_paginated(opts, pagination)
655 @valid_visibilities ~w[direct unlisted public private]
657 defp restrict_visibility(query, %{visibility: visibility})
658 when is_list(visibility) do
659 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
665 "activity_visibility(?, ?, ?) = ANY (?)",
675 Logger.error("Could not restrict visibility to #{visibility}")
679 defp restrict_visibility(query, %{visibility: visibility})
680 when visibility in @valid_visibilities do
684 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
688 defp restrict_visibility(_query, %{visibility: visibility})
689 when visibility not in @valid_visibilities do
690 Logger.error("Could not restrict visibility to #{visibility}")
693 defp restrict_visibility(query, _visibility), do: query
695 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
696 when is_list(visibility) do
697 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
702 "activity_visibility(?, ?, ?) = ANY (?)",
710 Logger.error("Could not exclude visibility to #{visibility}")
715 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
716 when visibility in @valid_visibilities do
721 "activity_visibility(?, ?, ?) = ?",
730 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
731 when visibility not in [nil | @valid_visibilities] do
732 Logger.error("Could not exclude visibility to #{visibility}")
736 defp exclude_visibility(query, _visibility), do: query
738 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
741 defp restrict_thread_visibility(
743 %{"user" => %User{skip_thread_containment: true}},
748 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
751 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
755 defp restrict_thread_visibility(query, _, _), do: query
757 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
760 |> Map.put("user", reading_user)
761 |> Map.put("actor_id", user.ap_id)
764 user_activities_recipients(%{
765 "godmode" => params["godmode"],
766 "reading_user" => reading_user
769 fetch_activities(recipients, params)
773 def fetch_user_activities(user, reading_user, params \\ %{}) do
776 |> Map.put("type", ["Create", "Announce"])
777 |> Map.put("user", reading_user)
778 |> Map.put("actor_id", user.ap_id)
779 |> Map.put("pinned_activity_ids", user.pinned_activities)
782 if User.blocks?(reading_user, user) do
786 |> Map.put("blocking_user", reading_user)
787 |> Map.put("muting_user", reading_user)
791 user_activities_recipients(%{
792 "godmode" => params["godmode"],
793 "reading_user" => reading_user
796 fetch_activities(recipients, params)
800 def fetch_statuses(reading_user, params) do
803 |> Map.put("type", ["Create", "Announce"])
806 user_activities_recipients(%{
807 "godmode" => params["godmode"],
808 "reading_user" => reading_user
811 fetch_activities(recipients, params, :offset)
815 defp user_activities_recipients(%{"godmode" => true}) do
819 defp user_activities_recipients(%{"reading_user" => reading_user}) do
821 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
823 [Constants.as_public()]
827 defp restrict_since(query, %{"since_id" => ""}), do: query
829 defp restrict_since(query, %{"since_id" => since_id}) do
830 from(activity in query, where: activity.id > ^since_id)
833 defp restrict_since(query, _), do: query
835 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
836 raise "Can't use the child object without preloading!"
839 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
840 when is_list(tag_reject) and tag_reject != [] do
842 [_activity, object] in query,
843 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
847 defp restrict_tag_reject(query, _), do: query
849 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
850 raise "Can't use the child object without preloading!"
853 defp restrict_tag_all(query, %{"tag_all" => tag_all})
854 when is_list(tag_all) and tag_all != [] do
856 [_activity, object] in query,
857 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
861 defp restrict_tag_all(query, _), do: query
863 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
864 raise "Can't use the child object without preloading!"
867 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
869 [_activity, object] in query,
870 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
874 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
876 [_activity, object] in query,
877 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
881 defp restrict_tag(query, _), do: query
883 defp restrict_recipients(query, [], _user), do: query
885 defp restrict_recipients(query, recipients, nil) do
886 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
889 defp restrict_recipients(query, recipients, user) do
892 where: fragment("? && ?", ^recipients, activity.recipients),
893 or_where: activity.actor == ^user.ap_id
897 defp restrict_local(query, %{"local_only" => true}) do
898 from(activity in query, where: activity.local == true)
901 defp restrict_local(query, _), do: query
903 defp restrict_actor(query, %{"actor_id" => actor_id}) do
904 from(activity in query, where: activity.actor == ^actor_id)
907 defp restrict_actor(query, _), do: query
909 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
910 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
913 defp restrict_type(query, %{"type" => type}) do
914 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
917 defp restrict_type(query, _), do: query
919 defp restrict_state(query, %{"state" => state}) do
920 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
923 defp restrict_state(query, _), do: query
925 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
927 [_activity, object] in query,
928 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
932 defp restrict_favorited_by(query, _), do: query
934 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
935 raise "Can't use the child object without preloading!"
938 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
940 [_activity, object] in query,
941 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
945 defp restrict_media(query, _), do: query
947 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
949 [_activity, object] in query,
950 where: fragment("?->>'inReplyTo' is null", object.data)
954 defp restrict_replies(query, %{
955 "reply_filtering_user" => user,
956 "reply_visibility" => "self"
959 [activity, object] in query,
962 "?->>'inReplyTo' is null OR ? = ANY(?)",
970 defp restrict_replies(query, %{
971 "reply_filtering_user" => user,
972 "reply_visibility" => "following"
975 [activity, object] in query,
978 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
980 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
989 defp restrict_replies(query, _), do: query
991 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
992 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
995 defp restrict_reblogs(query, _), do: query
997 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
999 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
1000 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
1003 from([activity] in query,
1004 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1005 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
1008 unless opts["skip_preload"] do
1009 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1015 defp restrict_muted(query, _), do: query
1017 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
1018 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
1019 domain_blocks = user.domain_blocks || []
1021 following_ap_ids = User.get_friends_ap_ids(user)
1024 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1027 [activity, object: o] in query,
1028 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1029 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
1032 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1039 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1047 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1056 defp restrict_blocked(query, _), do: query
1058 defp restrict_unlisted(query) do
1063 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1065 ^[Constants.as_public()]
1070 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
1071 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
1072 # and `restrict_muted/2`
1074 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
1075 when pinned in [true, "true", "1"] do
1076 from(activity in query, where: activity.id in ^ids)
1079 defp restrict_pinned(query, _), do: query
1081 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1082 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1088 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1096 defp restrict_muted_reblogs(query, _), do: query
1098 defp restrict_instance(query, %{"instance" => instance}) do
1103 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1107 from(activity in query, where: activity.actor in ^users)
1110 defp restrict_instance(query, _), do: query
1112 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1114 defp exclude_poll_votes(query, _) do
1115 if has_named_binding?(query, :object) do
1116 from([activity, object: o] in query,
1117 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1124 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1125 from(activity in query, where: activity.id != ^id)
1128 defp exclude_id(query, _), do: query
1130 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1132 defp maybe_preload_objects(query, _) do
1134 |> Activity.with_preloaded_object()
1137 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1139 defp maybe_preload_bookmarks(query, opts) do
1141 |> Activity.with_preloaded_bookmark(opts["user"])
1144 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1146 |> Activity.with_preloaded_report_notes()
1149 defp maybe_preload_report_notes(query, _), do: query
1151 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1153 defp maybe_set_thread_muted_field(query, opts) do
1155 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1158 defp maybe_order(query, %{order: :desc}) do
1160 |> order_by(desc: :id)
1163 defp maybe_order(query, %{order: :asc}) do
1165 |> order_by(asc: :id)
1168 defp maybe_order(query, _), do: query
1170 defp fetch_activities_query_ap_ids_ops(opts) do
1171 source_user = opts["muting_user"]
1172 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1174 ap_id_relationships =
1175 ap_id_relationships ++
1176 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1182 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1184 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1185 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1187 restrict_muted_reblogs_opts =
1188 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1190 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1193 def fetch_activities_query(recipients, opts \\ %{}) do
1194 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1195 fetch_activities_query_ap_ids_ops(opts)
1198 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1202 |> maybe_preload_objects(opts)
1203 |> maybe_preload_bookmarks(opts)
1204 |> maybe_preload_report_notes(opts)
1205 |> maybe_set_thread_muted_field(opts)
1206 |> maybe_order(opts)
1207 |> restrict_recipients(recipients, opts["user"])
1208 |> restrict_replies(opts)
1209 |> restrict_tag(opts)
1210 |> restrict_tag_reject(opts)
1211 |> restrict_tag_all(opts)
1212 |> restrict_since(opts)
1213 |> restrict_local(opts)
1214 |> restrict_actor(opts)
1215 |> restrict_type(opts)
1216 |> restrict_state(opts)
1217 |> restrict_favorited_by(opts)
1218 |> restrict_blocked(restrict_blocked_opts)
1219 |> restrict_muted(restrict_muted_opts)
1220 |> restrict_media(opts)
1221 |> restrict_visibility(opts)
1222 |> restrict_thread_visibility(opts, config)
1223 |> restrict_reblogs(opts)
1224 |> restrict_pinned(opts)
1225 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1226 |> restrict_instance(opts)
1227 |> Activity.restrict_deactivated_users()
1228 |> exclude_poll_votes(opts)
1229 |> exclude_visibility(opts)
1232 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1233 list_memberships = Pleroma.List.memberships(opts["user"])
1235 fetch_activities_query(recipients ++ list_memberships, opts)
1236 |> Pagination.fetch_paginated(opts, pagination)
1238 |> maybe_update_cc(list_memberships, opts["user"])
1242 Fetch favorites activities of user with order by sort adds to favorites
1244 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1245 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1247 |> Activity.Queries.by_actor()
1248 |> Activity.Queries.by_type("Like")
1249 |> Activity.with_joined_object()
1250 |> Object.with_joined_activity()
1251 |> select([_like, object, activity], %{activity | object: object})
1252 |> order_by([like, _, _], desc: like.id)
1253 |> Pagination.fetch_paginated(
1254 Map.merge(params, %{"skip_order" => true}),
1260 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1261 when is_list(list_memberships) and length(list_memberships) > 0 do
1262 Enum.map(activities, fn
1263 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1264 if Enum.any?(bcc, &(&1 in list_memberships)) do
1265 update_in(activity.data["cc"], &[user_ap_id | &1])
1275 defp maybe_update_cc(activities, _, _), do: activities
1277 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1278 from(activity in query,
1280 fragment("? && ?", activity.recipients, ^recipients) or
1281 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1282 ^Constants.as_public() in activity.recipients)
1286 def fetch_activities_bounded(
1288 recipients_with_public,
1290 pagination \\ :keyset
1292 fetch_activities_query([], opts)
1293 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1294 |> Pagination.fetch_paginated(opts, pagination)
1298 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1299 def upload(file, opts \\ []) do
1300 with {:ok, data} <- Upload.store(file, opts) do
1303 Map.put(data, "actor", opts[:actor])
1308 Repo.insert(%Object{data: obj_data})
1312 @spec get_actor_url(any()) :: binary() | nil
1313 defp get_actor_url(url) when is_binary(url), do: url
1314 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1316 defp get_actor_url(url) when is_list(url) do
1322 defp get_actor_url(_url), do: nil
1324 defp object_to_user_data(data) do
1326 data["icon"]["url"] &&
1329 "url" => [%{"href" => data["icon"]["url"]}]
1333 data["image"]["url"] &&
1336 "url" => [%{"href" => data["image"]["url"]}]
1341 |> Map.get("attachment", [])
1342 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1343 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1347 |> Map.get("tag", [])
1349 %{"type" => "Emoji"} -> true
1352 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1353 Map.put(acc, String.trim(name, ":"), url)
1356 locked = data["manuallyApprovesFollowers"] || false
1357 data = Transmogrifier.maybe_fix_user_object(data)
1358 discoverable = data["discoverable"] || false
1359 invisible = data["invisible"] || false
1360 actor_type = data["type"] || "Person"
1363 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1364 data["publicKey"]["publicKeyPem"]
1370 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1371 data["endpoints"]["sharedInbox"]
1378 uri: get_actor_url(data["url"]),
1384 discoverable: discoverable,
1385 invisible: invisible,
1388 follower_address: data["followers"],
1389 following_address: data["following"],
1390 bio: data["summary"],
1391 actor_type: actor_type,
1392 also_known_as: Map.get(data, "alsoKnownAs", []),
1393 public_key: public_key,
1394 inbox: data["inbox"],
1395 shared_inbox: shared_inbox
1398 # nickname can be nil because of virtual actors
1400 if data["preferredUsername"] do
1404 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1407 Map.put(user_data, :nickname, nil)
1413 def fetch_follow_information_for_user(user) do
1414 with {:ok, following_data} <-
1415 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1416 {:ok, hide_follows} <- collection_private(following_data),
1417 {:ok, followers_data} <-
1418 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1419 {:ok, hide_followers} <- collection_private(followers_data) do
1422 hide_follows: hide_follows,
1423 follower_count: normalize_counter(followers_data["totalItems"]),
1424 following_count: normalize_counter(following_data["totalItems"]),
1425 hide_followers: hide_followers
1428 {:error, _} = e -> e
1433 defp normalize_counter(counter) when is_integer(counter), do: counter
1434 defp normalize_counter(_), do: 0
1436 def maybe_update_follow_information(user_data) do
1437 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1438 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1440 {:collections_available,
1441 !!(user_data[:following_address] && user_data[:follower_address])},
1443 fetch_follow_information_for_user(user_data) do
1444 info = Map.merge(user_data[:info] || %{}, info)
1447 |> Map.put(:info, info)
1449 {:user_type_check, false} ->
1452 {:collections_available, false} ->
1455 {:enabled, false} ->
1460 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1467 defp collection_private(%{"first" => %{"type" => type}})
1468 when type in ["CollectionPage", "OrderedCollectionPage"],
1471 defp collection_private(%{"first" => first}) do
1472 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1473 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1476 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1477 {:error, _} = e -> e
1482 defp collection_private(_data), do: {:ok, true}
1484 def user_data_from_user_object(data) do
1485 with {:ok, data} <- MRF.filter(data),
1486 {:ok, data} <- object_to_user_data(data) do
1493 def fetch_and_prepare_user_from_ap_id(ap_id) do
1494 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1495 {:ok, data} <- user_data_from_user_object(data),
1496 data <- maybe_update_follow_information(data) do
1499 {:error, "Object has been deleted"} = e ->
1500 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1504 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1509 def make_user_from_ap_id(ap_id) do
1510 user = User.get_cached_by_ap_id(ap_id)
1512 if user && !User.ap_enabled?(user) do
1513 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1515 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1518 |> User.remote_user_changeset(data)
1519 |> User.update_and_set_cache()
1522 |> User.remote_user_changeset()
1532 def make_user_from_nickname(nickname) do
1533 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1534 make_user_from_ap_id(ap_id)
1536 _e -> {:error, "No AP id in WebFinger"}
1540 # filter out broken threads
1541 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1542 entire_thread_visible_for_user?(activity, user)
1545 # do post-processing on a specific activity
1546 def contain_activity(%Activity{} = activity, %User{} = user) do
1547 contain_broken_threads(activity, user)
1550 def fetch_direct_messages_query do
1552 |> restrict_type(%{"type" => "Create"})
1553 |> restrict_visibility(%{visibility: "direct"})
1554 |> order_by([activity], asc: activity.id)