1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
14 alias Pleroma.Object.Containment
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.Pagination
20 alias Pleroma.Web.ActivityPub.MRF
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.Utils
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
32 require Pleroma.Constants
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
46 user -> User.following?(user, actor)
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
88 defp check_remote_limit(_), do: true
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
107 def increase_replies_count_if_reply(_create_data), do: :noop
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
117 def decrease_replies_count_if_reply(_object), do: :noop
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
124 Object.increase_vote_count(reply_ap_id, name, actor)
127 def increase_poll_votes_if_vote(_create_data), do: :noop
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
134 Repo.insert(%Activity{
137 recipients: recipients,
138 actor: object["actor"]
140 {:ok, activity, meta}
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
156 Repo.insert(%Activity{
160 recipients: recipients
163 # Splice in the child object if we have one.
165 if not is_nil(object) do
166 Map.put(activity, :object, object)
171 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
175 %Activity{} = activity ->
178 {:fake, true, map, recipients} ->
179 activity = %Activity{
183 recipients: recipients,
187 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
195 def notify_and_stream(activity) do
196 Notification.create_notifications(activity)
198 conversation = create_or_bump_conversation(activity, activity.actor)
199 participations = get_participations(conversation)
201 stream_out_participations(participations)
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor),
207 Participation.mark_as_read(user, conversation) do
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
228 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
229 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
230 conversation = Repo.preload(conversation, :participations),
232 fetch_latest_activity_id_for_context(conversation.ap_id, %{
234 "blocking_user" => user
236 if last_activity_id do
237 stream_out_participations(conversation.participations)
242 def stream_out_participations(_, _), do: :noop
244 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
245 when data_type in ["Create", "Announce", "Delete"] do
247 |> Topics.get_activity_topics()
248 |> Streamer.stream(activity)
251 def stream_out(_activity) do
255 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
256 def create(params, fake \\ false) do
257 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
262 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
263 additional = params[:additional] || %{}
264 # only accept false as false value
265 local = !(params[:local] == false)
266 published = params[:published]
267 quick_insert? = Config.get([:env]) == :benchmark
271 %{to: to, actor: actor, published: published, context: context, object: object},
274 {:ok, activity} <- insert(create_data, local, fake),
275 {:fake, false, activity} <- {:fake, fake, activity},
276 _ <- increase_replies_count_if_reply(create_data),
277 _ <- increase_poll_votes_if_vote(create_data),
278 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
279 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
280 _ <- notify_and_stream(activity),
281 :ok <- maybe_federate(activity) do
284 {:quick_insert, true, activity} ->
287 {:fake, true, activity} ->
291 Repo.rollback(message)
295 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
296 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
297 additional = params[:additional] || %{}
298 # only accept false as false value
299 local = !(params[:local] == false)
300 published = params[:published]
304 %{to: to, actor: actor, published: published, context: context, object: object},
307 {:ok, activity} <- insert(listen_data, local),
308 _ <- notify_and_stream(activity),
309 :ok <- maybe_federate(activity) do
314 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def accept(params) do
316 accept_or_reject("Accept", params)
319 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
320 def reject(params) do
321 accept_or_reject("Reject", params)
324 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
325 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
326 local = Map.get(params, :local, true)
327 activity_id = Map.get(params, :activity_id, nil)
330 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
331 |> Utils.maybe_put("id", activity_id),
332 {:ok, activity} <- insert(data, local),
333 _ <- notify_and_stream(activity),
334 :ok <- maybe_federate(activity) do
339 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
341 local = !(params[:local] == false)
342 activity_id = params[:activity_id]
351 data <- Utils.maybe_put(data, "id", activity_id),
352 {:ok, activity} <- insert(data, local),
353 _ <- notify_and_stream(activity),
354 :ok <- maybe_federate(activity) do
359 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
360 {:ok, Activity.t()} | {:error, any()}
361 def follow(follower, followed, activity_id \\ nil, local \\ true) do
362 with {:ok, result} <-
363 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
368 defp do_follow(follower, followed, activity_id, local) do
369 with data <- make_follow_data(follower, followed, activity_id),
370 {:ok, activity} <- insert(data, local),
371 _ <- notify_and_stream(activity),
372 :ok <- maybe_federate(activity) do
375 {:error, error} -> Repo.rollback(error)
379 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
380 {:ok, Activity.t()} | nil | {:error, any()}
381 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
382 with {:ok, result} <-
383 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
388 defp do_unfollow(follower, followed, activity_id, local) do
389 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
390 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
391 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
392 {:ok, activity} <- insert(unfollow_data, local),
393 _ <- notify_and_stream(activity),
394 :ok <- maybe_federate(activity) do
398 {:error, error} -> Repo.rollback(error)
402 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
403 {:ok, Activity.t()} | {:error, any()}
404 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
405 with {:ok, result} <-
406 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
411 defp do_block(blocker, blocked, activity_id, local) do
412 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
414 if unfollow_blocked do
415 follow_activity = fetch_latest_follow(blocker, blocked)
416 if follow_activity, do: unfollow(blocker, blocked, nil, local)
419 with block_data <- make_block_data(blocker, blocked, activity_id),
420 {:ok, activity} <- insert(block_data, local),
421 _ <- notify_and_stream(activity),
422 :ok <- maybe_federate(activity) do
425 {:error, error} -> Repo.rollback(error)
429 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
439 # only accept false as false value
440 local = !(params[:local] == false)
441 forward = !(params[:forward] == false)
443 additional = params[:additional] || %{}
447 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
449 Map.merge(additional, %{"to" => [], "cc" => []})
452 with flag_data <- make_flag_data(params, additional),
453 {:ok, activity} <- insert(flag_data, local),
454 {:ok, stripped_activity} <- strip_report_status_data(activity),
455 _ <- notify_and_stream(activity),
456 :ok <- maybe_federate(stripped_activity) do
457 User.all_superusers()
458 |> Enum.filter(fn user -> not is_nil(user.email) end)
459 |> Enum.each(fn superuser ->
461 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
462 |> Pleroma.Emails.Mailer.deliver_async()
469 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
470 def move(%User{} = origin, %User{} = target, local \\ true) do
473 "actor" => origin.ap_id,
474 "object" => origin.ap_id,
475 "target" => target.ap_id
478 with true <- origin.ap_id in target.also_known_as,
479 {:ok, activity} <- insert(params, local),
480 _ <- notify_and_stream(activity) do
481 maybe_federate(activity)
483 BackgroundWorker.enqueue("move_following", %{
484 "origin_id" => origin.id,
485 "target_id" => target.id
490 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
495 def fetch_activities_for_context_query(context, opts) do
496 public = [Constants.as_public()]
500 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
503 from(activity in Activity)
504 |> maybe_preload_objects(opts)
505 |> maybe_preload_bookmarks(opts)
506 |> maybe_set_thread_muted_field(opts)
507 |> restrict_blocked(opts)
508 |> restrict_recipients(recipients, opts["user"])
512 "?->>'type' = ? and ?->>'context' = ?",
519 |> exclude_poll_votes(opts)
521 |> order_by([activity], desc: activity.id)
524 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
525 def fetch_activities_for_context(context, opts \\ %{}) do
527 |> fetch_activities_for_context_query(opts)
531 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
532 FlakeId.Ecto.CompatType.t() | nil
533 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
535 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
541 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
543 opts = Map.drop(opts, ["user"])
545 [Constants.as_public()]
546 |> fetch_activities_query(opts)
547 |> restrict_unlisted()
548 |> Pagination.fetch_paginated(opts, pagination)
551 @valid_visibilities ~w[direct unlisted public private]
553 defp restrict_visibility(query, %{visibility: visibility})
554 when is_list(visibility) do
555 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
561 "activity_visibility(?, ?, ?) = ANY (?)",
571 Logger.error("Could not restrict visibility to #{visibility}")
575 defp restrict_visibility(query, %{visibility: visibility})
576 when visibility in @valid_visibilities do
580 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
584 defp restrict_visibility(_query, %{visibility: visibility})
585 when visibility not in @valid_visibilities do
586 Logger.error("Could not restrict visibility to #{visibility}")
589 defp restrict_visibility(query, _visibility), do: query
591 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
592 when is_list(visibility) do
593 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
598 "activity_visibility(?, ?, ?) = ANY (?)",
606 Logger.error("Could not exclude visibility to #{visibility}")
611 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
612 when visibility in @valid_visibilities do
617 "activity_visibility(?, ?, ?) = ?",
626 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
627 when visibility not in [nil | @valid_visibilities] do
628 Logger.error("Could not exclude visibility to #{visibility}")
632 defp exclude_visibility(query, _visibility), do: query
634 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
637 defp restrict_thread_visibility(
639 %{"user" => %User{skip_thread_containment: true}},
644 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
647 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
651 defp restrict_thread_visibility(query, _, _), do: query
653 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
656 |> Map.put("user", reading_user)
657 |> Map.put("actor_id", user.ap_id)
660 user_activities_recipients(%{
661 "godmode" => params["godmode"],
662 "reading_user" => reading_user
665 fetch_activities(recipients, params)
669 def fetch_user_activities(user, reading_user, params \\ %{}) do
672 |> Map.put("type", ["Create", "Announce"])
673 |> Map.put("user", reading_user)
674 |> Map.put("actor_id", user.ap_id)
675 |> Map.put("pinned_activity_ids", user.pinned_activities)
678 if User.blocks?(reading_user, user) do
682 |> Map.put("blocking_user", reading_user)
683 |> Map.put("muting_user", reading_user)
687 user_activities_recipients(%{
688 "godmode" => params["godmode"],
689 "reading_user" => reading_user
692 fetch_activities(recipients, params)
696 def fetch_statuses(reading_user, params) do
699 |> Map.put("type", ["Create", "Announce"])
702 user_activities_recipients(%{
703 "godmode" => params["godmode"],
704 "reading_user" => reading_user
707 fetch_activities(recipients, params, :offset)
711 defp user_activities_recipients(%{"godmode" => true}) do
715 defp user_activities_recipients(%{"reading_user" => reading_user}) do
717 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
719 [Constants.as_public()]
723 defp restrict_since(query, %{"since_id" => ""}), do: query
725 defp restrict_since(query, %{"since_id" => since_id}) do
726 from(activity in query, where: activity.id > ^since_id)
729 defp restrict_since(query, _), do: query
731 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
732 raise "Can't use the child object without preloading!"
735 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
736 when is_list(tag_reject) and tag_reject != [] do
738 [_activity, object] in query,
739 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
743 defp restrict_tag_reject(query, _), do: query
745 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
746 raise "Can't use the child object without preloading!"
749 defp restrict_tag_all(query, %{"tag_all" => tag_all})
750 when is_list(tag_all) and tag_all != [] do
752 [_activity, object] in query,
753 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
757 defp restrict_tag_all(query, _), do: query
759 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
760 raise "Can't use the child object without preloading!"
763 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
765 [_activity, object] in query,
766 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
770 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
772 [_activity, object] in query,
773 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
777 defp restrict_tag(query, _), do: query
779 defp restrict_recipients(query, [], _user), do: query
781 defp restrict_recipients(query, recipients, nil) do
782 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
785 defp restrict_recipients(query, recipients, user) do
788 where: fragment("? && ?", ^recipients, activity.recipients),
789 or_where: activity.actor == ^user.ap_id
793 defp restrict_local(query, %{"local_only" => true}) do
794 from(activity in query, where: activity.local == true)
797 defp restrict_local(query, _), do: query
799 defp restrict_actor(query, %{"actor_id" => actor_id}) do
800 from(activity in query, where: activity.actor == ^actor_id)
803 defp restrict_actor(query, _), do: query
805 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
806 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
809 defp restrict_type(query, %{"type" => type}) do
810 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
813 defp restrict_type(query, _), do: query
815 defp restrict_state(query, %{"state" => state}) do
816 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
819 defp restrict_state(query, _), do: query
821 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
823 [_activity, object] in query,
824 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
828 defp restrict_favorited_by(query, _), do: query
830 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
831 raise "Can't use the child object without preloading!"
834 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
836 [_activity, object] in query,
837 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
841 defp restrict_media(query, _), do: query
843 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
845 [_activity, object] in query,
846 where: fragment("?->>'inReplyTo' is null", object.data)
850 defp restrict_replies(query, %{
851 "reply_filtering_user" => user,
852 "reply_visibility" => "self"
855 [activity, object] in query,
858 "?->>'inReplyTo' is null OR ? = ANY(?)",
866 defp restrict_replies(query, %{
867 "reply_filtering_user" => user,
868 "reply_visibility" => "following"
871 [activity, object] in query,
874 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
876 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
885 defp restrict_replies(query, _), do: query
887 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
888 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
891 defp restrict_reblogs(query, _), do: query
893 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
895 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
896 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
899 from([activity] in query,
900 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
901 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
904 unless opts["skip_preload"] do
905 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
911 defp restrict_muted(query, _), do: query
913 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
914 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
915 domain_blocks = user.domain_blocks || []
917 following_ap_ids = User.get_friends_ap_ids(user)
920 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
923 [activity, object: o] in query,
924 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
925 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
928 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
935 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
943 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
952 defp restrict_blocked(query, _), do: query
954 defp restrict_unlisted(query) do
959 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
961 ^[Constants.as_public()]
966 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
967 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
968 # and `restrict_muted/2`
970 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
971 when pinned in [true, "true", "1"] do
972 from(activity in query, where: activity.id in ^ids)
975 defp restrict_pinned(query, _), do: query
977 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
978 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
984 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
992 defp restrict_muted_reblogs(query, _), do: query
994 defp restrict_instance(query, %{"instance" => instance}) do
999 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1003 from(activity in query, where: activity.actor in ^users)
1006 defp restrict_instance(query, _), do: query
1008 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1010 defp exclude_poll_votes(query, _) do
1011 if has_named_binding?(query, :object) do
1012 from([activity, object: o] in query,
1013 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1020 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1021 from(activity in query, where: activity.id != ^id)
1024 defp exclude_id(query, _), do: query
1026 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1028 defp maybe_preload_objects(query, _) do
1030 |> Activity.with_preloaded_object()
1033 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1035 defp maybe_preload_bookmarks(query, opts) do
1037 |> Activity.with_preloaded_bookmark(opts["user"])
1040 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1042 |> Activity.with_preloaded_report_notes()
1045 defp maybe_preload_report_notes(query, _), do: query
1047 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1049 defp maybe_set_thread_muted_field(query, opts) do
1051 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1054 defp maybe_order(query, %{order: :desc}) do
1056 |> order_by(desc: :id)
1059 defp maybe_order(query, %{order: :asc}) do
1061 |> order_by(asc: :id)
1064 defp maybe_order(query, _), do: query
1066 defp fetch_activities_query_ap_ids_ops(opts) do
1067 source_user = opts["muting_user"]
1068 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1070 ap_id_relationships =
1071 ap_id_relationships ++
1072 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1078 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1080 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1081 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1083 restrict_muted_reblogs_opts =
1084 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1086 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1089 def fetch_activities_query(recipients, opts \\ %{}) do
1090 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1091 fetch_activities_query_ap_ids_ops(opts)
1094 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1098 |> maybe_preload_objects(opts)
1099 |> maybe_preload_bookmarks(opts)
1100 |> maybe_preload_report_notes(opts)
1101 |> maybe_set_thread_muted_field(opts)
1102 |> maybe_order(opts)
1103 |> restrict_recipients(recipients, opts["user"])
1104 |> restrict_replies(opts)
1105 |> restrict_tag(opts)
1106 |> restrict_tag_reject(opts)
1107 |> restrict_tag_all(opts)
1108 |> restrict_since(opts)
1109 |> restrict_local(opts)
1110 |> restrict_actor(opts)
1111 |> restrict_type(opts)
1112 |> restrict_state(opts)
1113 |> restrict_favorited_by(opts)
1114 |> restrict_blocked(restrict_blocked_opts)
1115 |> restrict_muted(restrict_muted_opts)
1116 |> restrict_media(opts)
1117 |> restrict_visibility(opts)
1118 |> restrict_thread_visibility(opts, config)
1119 |> restrict_reblogs(opts)
1120 |> restrict_pinned(opts)
1121 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1122 |> restrict_instance(opts)
1123 |> Activity.restrict_deactivated_users()
1124 |> exclude_poll_votes(opts)
1125 |> exclude_visibility(opts)
1128 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1129 list_memberships = Pleroma.List.memberships(opts["user"])
1131 fetch_activities_query(recipients ++ list_memberships, opts)
1132 |> Pagination.fetch_paginated(opts, pagination)
1134 |> maybe_update_cc(list_memberships, opts["user"])
1138 Fetch favorites activities of user with order by sort adds to favorites
1140 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1141 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1143 |> Activity.Queries.by_actor()
1144 |> Activity.Queries.by_type("Like")
1145 |> Activity.with_joined_object()
1146 |> Object.with_joined_activity()
1147 |> select([_like, object, activity], %{activity | object: object})
1148 |> order_by([like, _, _], desc: like.id)
1149 |> Pagination.fetch_paginated(
1150 Map.merge(params, %{"skip_order" => true}),
1156 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1157 when is_list(list_memberships) and length(list_memberships) > 0 do
1158 Enum.map(activities, fn
1159 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1160 if Enum.any?(bcc, &(&1 in list_memberships)) do
1161 update_in(activity.data["cc"], &[user_ap_id | &1])
1171 defp maybe_update_cc(activities, _, _), do: activities
1173 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1174 from(activity in query,
1176 fragment("? && ?", activity.recipients, ^recipients) or
1177 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1178 ^Constants.as_public() in activity.recipients)
1182 def fetch_activities_bounded(
1184 recipients_with_public,
1186 pagination \\ :keyset
1188 fetch_activities_query([], opts)
1189 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1190 |> Pagination.fetch_paginated(opts, pagination)
1194 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1195 def upload(file, opts \\ []) do
1196 with {:ok, data} <- Upload.store(file, opts) do
1199 Map.put(data, "actor", opts[:actor])
1204 Repo.insert(%Object{data: obj_data})
1208 @spec get_actor_url(any()) :: binary() | nil
1209 defp get_actor_url(url) when is_binary(url), do: url
1210 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1212 defp get_actor_url(url) when is_list(url) do
1218 defp get_actor_url(_url), do: nil
1220 defp object_to_user_data(data) do
1222 data["icon"]["url"] &&
1225 "url" => [%{"href" => data["icon"]["url"]}]
1229 data["image"]["url"] &&
1232 "url" => [%{"href" => data["image"]["url"]}]
1237 |> Map.get("attachment", [])
1238 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1239 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1243 |> Map.get("tag", [])
1245 %{"type" => "Emoji"} -> true
1248 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1249 Map.put(acc, String.trim(name, ":"), url)
1252 locked = data["manuallyApprovesFollowers"] || false
1253 data = Transmogrifier.maybe_fix_user_object(data)
1254 discoverable = data["discoverable"] || false
1255 invisible = data["invisible"] || false
1256 actor_type = data["type"] || "Person"
1259 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1260 data["publicKey"]["publicKeyPem"]
1266 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1267 data["endpoints"]["sharedInbox"]
1274 uri: get_actor_url(data["url"]),
1280 discoverable: discoverable,
1281 invisible: invisible,
1284 follower_address: data["followers"],
1285 following_address: data["following"],
1286 bio: data["summary"],
1287 actor_type: actor_type,
1288 also_known_as: Map.get(data, "alsoKnownAs", []),
1289 public_key: public_key,
1290 inbox: data["inbox"],
1291 shared_inbox: shared_inbox
1294 # nickname can be nil because of virtual actors
1296 if data["preferredUsername"] do
1300 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1303 Map.put(user_data, :nickname, nil)
1309 def fetch_follow_information_for_user(user) do
1310 with {:ok, following_data} <-
1311 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1312 {:ok, hide_follows} <- collection_private(following_data),
1313 {:ok, followers_data} <-
1314 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1315 {:ok, hide_followers} <- collection_private(followers_data) do
1318 hide_follows: hide_follows,
1319 follower_count: normalize_counter(followers_data["totalItems"]),
1320 following_count: normalize_counter(following_data["totalItems"]),
1321 hide_followers: hide_followers
1324 {:error, _} = e -> e
1329 defp normalize_counter(counter) when is_integer(counter), do: counter
1330 defp normalize_counter(_), do: 0
1332 def maybe_update_follow_information(user_data) do
1333 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1334 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1336 {:collections_available,
1337 !!(user_data[:following_address] && user_data[:follower_address])},
1339 fetch_follow_information_for_user(user_data) do
1340 info = Map.merge(user_data[:info] || %{}, info)
1343 |> Map.put(:info, info)
1345 {:user_type_check, false} ->
1348 {:collections_available, false} ->
1351 {:enabled, false} ->
1356 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1363 defp collection_private(%{"first" => %{"type" => type}})
1364 when type in ["CollectionPage", "OrderedCollectionPage"],
1367 defp collection_private(%{"first" => first}) do
1368 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1369 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1372 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1373 {:error, _} = e -> e
1378 defp collection_private(_data), do: {:ok, true}
1380 def user_data_from_user_object(data) do
1381 with {:ok, data} <- MRF.filter(data),
1382 {:ok, data} <- object_to_user_data(data) do
1389 def fetch_and_prepare_user_from_ap_id(ap_id) do
1390 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1391 {:ok, data} <- user_data_from_user_object(data),
1392 data <- maybe_update_follow_information(data) do
1395 {:error, "Object has been deleted"} = e ->
1396 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1400 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1405 def make_user_from_ap_id(ap_id) do
1406 user = User.get_cached_by_ap_id(ap_id)
1408 if user && !User.ap_enabled?(user) do
1409 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1411 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1414 |> User.remote_user_changeset(data)
1415 |> User.update_and_set_cache()
1418 |> User.remote_user_changeset()
1428 def make_user_from_nickname(nickname) do
1429 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1430 make_user_from_ap_id(ap_id)
1432 _e -> {:error, "No AP id in WebFinger"}
1436 # filter out broken threads
1437 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1438 entire_thread_visible_for_user?(activity, user)
1441 # do post-processing on a specific activity
1442 def contain_activity(%Activity{} = activity, %User{} = user) do
1443 contain_broken_threads(activity, user)
1446 def fetch_direct_messages_query do
1448 |> restrict_type(%{"type" => "Create"})
1449 |> restrict_visibility(%{visibility: "direct"})
1450 |> order_by([activity], asc: activity.id)