1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
32 require Pleroma.Constants
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
46 user -> User.following?(user, actor)
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
70 defp check_actor_is_active(nil), do: true
72 defp check_actor_is_active(actor) when is_binary(actor) do
73 case User.get_cached_by_ap_id(actor) do
74 %User{deactivated: deactivated} -> not deactivated
79 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
80 limit = Config.get([:instance, :remote_limit])
81 String.length(content) <= limit
84 defp check_remote_limit(_), do: true
86 defp increase_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
90 def decrease_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
94 defp increase_replies_count_if_reply(%{
95 "object" => %{"inReplyTo" => reply_ap_id} = object,
98 if is_public?(object) do
99 Object.increase_replies_count(reply_ap_id)
103 defp increase_replies_count_if_reply(_create_data), do: :noop
105 defp increase_poll_votes_if_vote(%{
106 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
110 Object.increase_vote_count(reply_ap_id, name, actor)
113 defp increase_poll_votes_if_vote(_create_data), do: :noop
115 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
116 def persist(object, meta) do
117 with local <- Keyword.fetch!(meta, :local),
118 {recipients, _, _} <- get_recipients(object),
120 Repo.insert(%Activity{
123 recipients: recipients,
124 actor: object["actor"]
126 {:ok, activity, meta}
130 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
131 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
132 with nil <- Activity.normalize(map),
133 map <- lazy_put_activity_defaults(map, fake),
134 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
135 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
136 {:ok, map} <- MRF.filter(map),
137 {recipients, _, _} = get_recipients(map),
138 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
139 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
140 {:ok, map, object} <- insert_full_object(map) do
142 Repo.insert(%Activity{
146 recipients: recipients
149 # Splice in the child object if we have one.
150 activity = Maps.put_if_present(activity, :object, object)
152 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
156 %Activity{} = activity ->
159 {:fake, true, map, recipients} ->
160 activity = %Activity{
164 recipients: recipients,
168 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
176 def notify_and_stream(activity) do
177 Notification.create_notifications(activity)
179 conversation = create_or_bump_conversation(activity, activity.actor)
180 participations = get_participations(conversation)
182 stream_out_participations(participations)
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
193 defp get_participations({:ok, conversation}) do
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
199 defp get_participations(_), do: []
201 def stream_out_participations(participations) do
204 |> Repo.preload(:user)
206 Streamer.stream("participation", participations)
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
214 fetch_latest_activity_id_for_context(conversation.ap_id, %{
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
225 def stream_out_participations(_, _), do: :noop
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
234 def stream_out(_activity) do
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
254 %{to: to, actor: actor, published: published, context: context, object: object},
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
268 {:quick_insert, true, activity} ->
271 {:fake, true, activity} ->
275 Repo.rollback(message)
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
288 %{to: to, actor: actor, published: published, context: context, object: object},
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
325 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
326 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
327 local = !(params[:local] == false)
328 activity_id = params[:activity_id]
338 |> Maps.put_if_present("id", activity_id)
340 with {:ok, activity} <- insert(data, local),
341 _ <- notify_and_stream(activity),
342 :ok <- maybe_federate(activity) do
347 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | {:error, any()}
349 def follow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
356 defp do_follow(follower, followed, activity_id, local) do
357 data = make_follow_data(follower, followed, activity_id)
359 with {:ok, activity} <- insert(data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
364 {:error, error} -> Repo.rollback(error)
368 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
369 {:ok, Activity.t()} | nil | {:error, any()}
370 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
371 with {:ok, result} <-
372 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
377 defp do_unfollow(follower, followed, activity_id, local) do
378 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
379 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
380 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
381 {:ok, activity} <- insert(unfollow_data, local),
382 _ <- notify_and_stream(activity),
383 :ok <- maybe_federate(activity) do
387 {:error, error} -> Repo.rollback(error)
391 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
392 {:ok, Activity.t()} | {:error, any()}
393 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
394 with {:ok, result} <-
395 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
400 defp do_block(blocker, blocked, activity_id, local) do
401 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
403 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
404 unfollow(blocker, blocked, nil, local)
407 block_data = make_block_data(blocker, blocked, activity_id)
409 with {:ok, activity} <- insert(block_data, local),
410 _ <- notify_and_stream(activity),
411 :ok <- maybe_federate(activity) do
414 {:error, error} -> Repo.rollback(error)
418 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
432 additional = params[:additional] || %{}
436 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
438 Map.merge(additional, %{"to" => [], "cc" => []})
441 with flag_data <- make_flag_data(params, additional),
442 {:ok, activity} <- insert(flag_data, local),
443 {:ok, stripped_activity} <- strip_report_status_data(activity),
444 _ <- notify_and_stream(activity),
445 :ok <- maybe_federate(stripped_activity) do
446 User.all_superusers()
447 |> Enum.filter(fn user -> not is_nil(user.email) end)
448 |> Enum.each(fn superuser ->
450 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
451 |> Pleroma.Emails.Mailer.deliver_async()
458 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
459 def move(%User{} = origin, %User{} = target, local \\ true) do
462 "actor" => origin.ap_id,
463 "object" => origin.ap_id,
464 "target" => target.ap_id
467 with true <- origin.ap_id in target.also_known_as,
468 {:ok, activity} <- insert(params, local),
469 _ <- notify_and_stream(activity) do
470 maybe_federate(activity)
472 BackgroundWorker.enqueue("move_following", %{
473 "origin_id" => origin.id,
474 "target_id" => target.id
479 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
484 def fetch_activities_for_context_query(context, opts) do
485 public = [Constants.as_public()]
489 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
492 from(activity in Activity)
493 |> maybe_preload_objects(opts)
494 |> maybe_preload_bookmarks(opts)
495 |> maybe_set_thread_muted_field(opts)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts[:user])
501 "?->>'type' = ? and ?->>'context' = ?",
508 |> exclude_poll_votes(opts)
510 |> order_by([activity], desc: activity.id)
513 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
514 def fetch_activities_for_context(context, opts \\ %{}) do
516 |> fetch_activities_for_context_query(opts)
520 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
521 FlakeId.Ecto.CompatType.t() | nil
522 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
524 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
530 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
531 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
532 opts = Map.delete(opts, :user)
534 [Constants.as_public()]
535 |> fetch_activities_query(opts)
536 |> restrict_unlisted(opts)
537 |> Pagination.fetch_paginated(opts, pagination)
540 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
541 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
543 |> Map.put(:restrict_unlisted, true)
544 |> fetch_public_or_unlisted_activities(pagination)
547 @valid_visibilities ~w[direct unlisted public private]
549 defp restrict_visibility(query, %{visibility: visibility})
550 when is_list(visibility) do
551 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
556 "activity_visibility(?, ?, ?) = ANY (?)",
564 Logger.error("Could not restrict visibility to #{visibility}")
568 defp restrict_visibility(query, %{visibility: visibility})
569 when visibility in @valid_visibilities do
573 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
577 defp restrict_visibility(_query, %{visibility: visibility})
578 when visibility not in @valid_visibilities do
579 Logger.error("Could not restrict visibility to #{visibility}")
582 defp restrict_visibility(query, _visibility), do: query
584 defp exclude_visibility(query, %{exclude_visibilities: visibility})
585 when is_list(visibility) do
586 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
591 "activity_visibility(?, ?, ?) = ANY (?)",
599 Logger.error("Could not exclude visibility to #{visibility}")
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when visibility in @valid_visibilities do
610 "activity_visibility(?, ?, ?) = ?",
619 defp exclude_visibility(query, %{exclude_visibilities: visibility})
620 when visibility not in [nil | @valid_visibilities] do
621 Logger.error("Could not exclude visibility to #{visibility}")
625 defp exclude_visibility(query, _visibility), do: query
627 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
630 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
633 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
636 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
640 defp restrict_thread_visibility(query, _, _), do: query
642 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
645 |> Map.put(:user, reading_user)
646 |> Map.put(:actor_id, user.ap_id)
649 godmode: params[:godmode],
650 reading_user: reading_user
652 |> user_activities_recipients()
653 |> fetch_activities(params)
657 def fetch_user_activities(user, reading_user, params \\ %{}) do
660 |> Map.put(:type, ["Create", "Announce"])
661 |> Map.put(:user, reading_user)
662 |> Map.put(:actor_id, user.ap_id)
663 |> Map.put(:pinned_activity_ids, user.pinned_activities)
666 if User.blocks?(reading_user, user) do
670 |> Map.put(:blocking_user, reading_user)
671 |> Map.put(:muting_user, reading_user)
675 godmode: params[:godmode],
676 reading_user: reading_user
678 |> user_activities_recipients()
679 |> fetch_activities(params)
683 def fetch_statuses(reading_user, params) do
684 params = Map.put(params, :type, ["Create", "Announce"])
687 godmode: params[:godmode],
688 reading_user: reading_user
690 |> user_activities_recipients()
691 |> fetch_activities(params, :offset)
695 defp user_activities_recipients(%{godmode: true}), do: []
697 defp user_activities_recipients(%{reading_user: reading_user}) do
699 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
701 [Constants.as_public()]
705 defp restrict_since(query, %{since_id: ""}), do: query
707 defp restrict_since(query, %{since_id: since_id}) do
708 from(activity in query, where: activity.id > ^since_id)
711 defp restrict_since(query, _), do: query
713 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
714 raise "Can't use the child object without preloading!"
717 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
719 [_activity, object] in query,
720 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
724 defp restrict_tag_reject(query, _), do: query
726 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
727 raise "Can't use the child object without preloading!"
730 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
732 [_activity, object] in query,
733 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
737 defp restrict_tag_all(query, _), do: query
739 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
740 raise "Can't use the child object without preloading!"
743 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
745 [_activity, object] in query,
746 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
750 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
752 [_activity, object] in query,
753 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
757 defp restrict_tag(query, _), do: query
759 defp restrict_recipients(query, [], _user), do: query
761 defp restrict_recipients(query, recipients, nil) do
762 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
765 defp restrict_recipients(query, recipients, user) do
768 where: fragment("? && ?", ^recipients, activity.recipients),
769 or_where: activity.actor == ^user.ap_id
773 defp restrict_local(query, %{local_only: true}) do
774 from(activity in query, where: activity.local == true)
777 defp restrict_local(query, _), do: query
779 defp restrict_actor(query, %{actor_id: actor_id}) do
780 from(activity in query, where: activity.actor == ^actor_id)
783 defp restrict_actor(query, _), do: query
785 defp restrict_type(query, %{type: type}) when is_binary(type) do
786 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
789 defp restrict_type(query, %{type: type}) do
790 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
793 defp restrict_type(query, _), do: query
795 defp restrict_state(query, %{state: state}) do
796 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
799 defp restrict_state(query, _), do: query
801 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
803 [_activity, object] in query,
804 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
808 defp restrict_favorited_by(query, _), do: query
810 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
811 raise "Can't use the child object without preloading!"
814 defp restrict_media(query, %{only_media: true}) do
816 [_activity, object] in query,
817 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
821 defp restrict_media(query, _), do: query
823 defp restrict_replies(query, %{exclude_replies: true}) do
825 [_activity, object] in query,
826 where: fragment("?->>'inReplyTo' is null", object.data)
830 defp restrict_replies(query, %{
831 reply_filtering_user: user,
832 reply_visibility: "self"
835 [activity, object] in query,
838 "?->>'inReplyTo' is null OR ? = ANY(?)",
846 defp restrict_replies(query, %{
847 reply_filtering_user: user,
848 reply_visibility: "following"
851 [activity, object] in query,
854 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
856 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
865 defp restrict_replies(query, _), do: query
867 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
868 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
871 defp restrict_reblogs(query, _), do: query
873 defp restrict_muted(query, %{with_muted: true}), do: query
875 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
876 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
879 from([activity] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
881 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
884 unless opts[:skip_preload] do
885 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
891 defp restrict_muted(query, _), do: query
893 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
894 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
895 domain_blocks = user.domain_blocks || []
897 following_ap_ids = User.get_friends_ap_ids(user)
900 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
903 [activity, object: o] in query,
904 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
905 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
908 "recipients_contain_blocked_domains(?, ?) = false",
914 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
921 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
929 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
938 defp restrict_blocked(query, _), do: query
940 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
945 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
947 ^[Constants.as_public()]
952 defp restrict_unlisted(query, _), do: query
954 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
955 from(activity in query, where: activity.id in ^ids)
958 defp restrict_pinned(query, _), do: query
960 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
961 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
967 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
975 defp restrict_muted_reblogs(query, _), do: query
977 defp restrict_instance(query, %{instance: instance}) do
982 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
986 from(activity in query, where: activity.actor in ^users)
989 defp restrict_instance(query, _), do: query
991 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
993 defp exclude_poll_votes(query, _) do
994 if has_named_binding?(query, :object) do
995 from([activity, object: o] in query,
996 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1003 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1005 defp exclude_invisible_actors(query, _opts) do
1007 User.Query.build(%{invisible: true, select: [:ap_id]})
1009 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1011 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1014 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1015 from(activity in query, where: activity.id != ^id)
1018 defp exclude_id(query, _), do: query
1020 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1022 defp maybe_preload_objects(query, _) do
1024 |> Activity.with_preloaded_object()
1027 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1029 defp maybe_preload_bookmarks(query, opts) do
1031 |> Activity.with_preloaded_bookmark(opts[:user])
1034 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1036 |> Activity.with_preloaded_report_notes()
1039 defp maybe_preload_report_notes(query, _), do: query
1041 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1043 defp maybe_set_thread_muted_field(query, opts) do
1045 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1048 defp maybe_order(query, %{order: :desc}) do
1050 |> order_by(desc: :id)
1053 defp maybe_order(query, %{order: :asc}) do
1055 |> order_by(asc: :id)
1058 defp maybe_order(query, _), do: query
1060 defp fetch_activities_query_ap_ids_ops(opts) do
1061 source_user = opts[:muting_user]
1062 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1064 ap_id_relationships =
1065 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1066 [:block | ap_id_relationships]
1071 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1073 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1074 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1076 restrict_muted_reblogs_opts =
1077 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1079 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1082 def fetch_activities_query(recipients, opts \\ %{}) do
1083 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1084 fetch_activities_query_ap_ids_ops(opts)
1087 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1091 |> maybe_preload_objects(opts)
1092 |> maybe_preload_bookmarks(opts)
1093 |> maybe_preload_report_notes(opts)
1094 |> maybe_set_thread_muted_field(opts)
1095 |> maybe_order(opts)
1096 |> restrict_recipients(recipients, opts[:user])
1097 |> restrict_replies(opts)
1098 |> restrict_tag(opts)
1099 |> restrict_tag_reject(opts)
1100 |> restrict_tag_all(opts)
1101 |> restrict_since(opts)
1102 |> restrict_local(opts)
1103 |> restrict_actor(opts)
1104 |> restrict_type(opts)
1105 |> restrict_state(opts)
1106 |> restrict_favorited_by(opts)
1107 |> restrict_blocked(restrict_blocked_opts)
1108 |> restrict_muted(restrict_muted_opts)
1109 |> restrict_media(opts)
1110 |> restrict_visibility(opts)
1111 |> restrict_thread_visibility(opts, config)
1112 |> restrict_reblogs(opts)
1113 |> restrict_pinned(opts)
1114 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1115 |> restrict_instance(opts)
1116 |> Activity.restrict_deactivated_users()
1117 |> exclude_poll_votes(opts)
1118 |> exclude_invisible_actors(opts)
1119 |> exclude_visibility(opts)
1122 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1123 list_memberships = Pleroma.List.memberships(opts[:user])
1125 fetch_activities_query(recipients ++ list_memberships, opts)
1126 |> Pagination.fetch_paginated(opts, pagination)
1128 |> maybe_update_cc(list_memberships, opts[:user])
1132 Fetch favorites activities of user with order by sort adds to favorites
1134 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1135 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1137 |> Activity.Queries.by_actor()
1138 |> Activity.Queries.by_type("Like")
1139 |> Activity.with_joined_object()
1140 |> Object.with_joined_activity()
1141 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1142 |> order_by([like, _, _], desc_nulls_last: like.id)
1143 |> Pagination.fetch_paginated(
1144 Map.merge(params, %{skip_order: true}),
1149 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1150 Enum.map(activities, fn
1151 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1152 if Enum.any?(bcc, &(&1 in list_memberships)) do
1153 update_in(activity.data["cc"], &[user_ap_id | &1])
1163 defp maybe_update_cc(activities, _, _), do: activities
1165 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1166 from(activity in query,
1168 fragment("? && ?", activity.recipients, ^recipients) or
1169 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1170 ^Constants.as_public() in activity.recipients)
1174 def fetch_activities_bounded(
1176 recipients_with_public,
1178 pagination \\ :keyset
1180 fetch_activities_query([], opts)
1181 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1182 |> Pagination.fetch_paginated(opts, pagination)
1186 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1187 def upload(file, opts \\ []) do
1188 with {:ok, data} <- Upload.store(file, opts) do
1189 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1191 Repo.insert(%Object{data: obj_data})
1195 @spec get_actor_url(any()) :: binary() | nil
1196 defp get_actor_url(url) when is_binary(url), do: url
1197 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1199 defp get_actor_url(url) when is_list(url) do
1205 defp get_actor_url(_url), do: nil
1207 defp object_to_user_data(data) do
1209 data["icon"]["url"] &&
1212 "url" => [%{"href" => data["icon"]["url"]}]
1216 data["image"]["url"] &&
1219 "url" => [%{"href" => data["image"]["url"]}]
1224 |> Map.get("attachment", [])
1225 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1226 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1230 |> Map.get("tag", [])
1232 %{"type" => "Emoji"} -> true
1235 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1236 {String.trim(name, ":"), url}
1239 locked = data["manuallyApprovesFollowers"] || false
1240 data = Transmogrifier.maybe_fix_user_object(data)
1241 discoverable = data["discoverable"] || false
1242 invisible = data["invisible"] || false
1243 actor_type = data["type"] || "Person"
1246 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1247 data["publicKey"]["publicKeyPem"]
1253 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1254 data["endpoints"]["sharedInbox"]
1261 uri: get_actor_url(data["url"]),
1267 discoverable: discoverable,
1268 invisible: invisible,
1271 follower_address: data["followers"],
1272 following_address: data["following"],
1273 bio: data["summary"],
1274 actor_type: actor_type,
1275 also_known_as: Map.get(data, "alsoKnownAs", []),
1276 public_key: public_key,
1277 inbox: data["inbox"],
1278 shared_inbox: shared_inbox
1281 # nickname can be nil because of virtual actors
1282 if data["preferredUsername"] do
1286 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1289 Map.put(user_data, :nickname, nil)
1293 def fetch_follow_information_for_user(user) do
1294 with {:ok, following_data} <-
1295 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1296 {:ok, hide_follows} <- collection_private(following_data),
1297 {:ok, followers_data} <-
1298 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1299 {:ok, hide_followers} <- collection_private(followers_data) do
1302 hide_follows: hide_follows,
1303 follower_count: normalize_counter(followers_data["totalItems"]),
1304 following_count: normalize_counter(following_data["totalItems"]),
1305 hide_followers: hide_followers
1308 {:error, _} = e -> e
1313 defp normalize_counter(counter) when is_integer(counter), do: counter
1314 defp normalize_counter(_), do: 0
1316 def maybe_update_follow_information(user_data) do
1317 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1318 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1320 {:collections_available,
1321 !!(user_data[:following_address] && user_data[:follower_address])},
1323 fetch_follow_information_for_user(user_data) do
1324 info = Map.merge(user_data[:info] || %{}, info)
1327 |> Map.put(:info, info)
1329 {:user_type_check, false} ->
1332 {:collections_available, false} ->
1335 {:enabled, false} ->
1340 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1347 defp collection_private(%{"first" => %{"type" => type}})
1348 when type in ["CollectionPage", "OrderedCollectionPage"],
1351 defp collection_private(%{"first" => first}) do
1352 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1353 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1356 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1357 {:error, _} = e -> e
1362 defp collection_private(_data), do: {:ok, true}
1364 def user_data_from_user_object(data) do
1365 with {:ok, data} <- MRF.filter(data) do
1366 {:ok, object_to_user_data(data)}
1372 def fetch_and_prepare_user_from_ap_id(ap_id) do
1373 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1374 {:ok, data} <- user_data_from_user_object(data) do
1375 {:ok, maybe_update_follow_information(data)}
1377 {:error, "Object has been deleted" = e} ->
1378 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1382 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1387 def make_user_from_ap_id(ap_id) do
1388 user = User.get_cached_by_ap_id(ap_id)
1390 if user && !User.ap_enabled?(user) do
1391 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1393 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1396 |> User.remote_user_changeset(data)
1397 |> User.update_and_set_cache()
1400 |> User.remote_user_changeset()
1408 def make_user_from_nickname(nickname) do
1409 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1410 make_user_from_ap_id(ap_id)
1412 _e -> {:error, "No AP id in WebFinger"}
1416 # filter out broken threads
1417 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1418 entire_thread_visible_for_user?(activity, user)
1421 # do post-processing on a specific activity
1422 def contain_activity(%Activity{} = activity, %User{} = user) do
1423 contain_broken_threads(activity, user)
1426 def fetch_direct_messages_query do
1428 |> restrict_type(%{type: "Create"})
1429 |> restrict_visibility(%{visibility: "direct"})
1430 |> order_by([activity], asc: activity.id)