1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
92 Object.increase_vote_count(reply_ap_id, name, actor)
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
109 Repo.insert(%Activity{
112 recipients: recipients,
113 actor: object["actor"]
115 {:ok, activity, meta}
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
135 recipients: recipients
138 |> maybe_create_activity_expiration()
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
147 %Activity{} = activity ->
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
155 recipients: recipients,
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
173 stream_out_participations(participations)
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
182 defp maybe_create_activity_expiration(result), do: result
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
192 defp get_participations({:ok, conversation}) do
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
198 defp get_participations(_), do: []
200 def stream_out_participations(participations) do
203 |> Repo.preload(:user)
205 Streamer.stream("participation", participations)
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
213 fetch_latest_activity_id_for_context(conversation.ap_id, %{
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
224 def stream_out_participations(_, _), do: :noop
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
233 def stream_out(_activity) do
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
253 %{to: to, actor: actor, published: published, context: context, object: object},
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
267 {:quick_insert, true, activity} ->
270 {:fake, true, activity} ->
274 Repo.rollback(message)
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
287 %{to: to, actor: actor, published: published, context: context, object: object},
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
324 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
325 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
326 local = !(params[:local] == false)
327 activity_id = params[:activity_id]
337 |> Maps.put_if_present("id", activity_id)
339 with {:ok, activity} <- insert(data, local),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(activity) do
346 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
347 {:ok, Activity.t()} | {:error, any()}
348 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
355 defp do_follow(follower, followed, activity_id, local, opts) do
356 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
357 data = make_follow_data(follower, followed, activity_id)
359 with {:ok, activity} <- insert(data, local),
360 _ <- skip_notify_and_stream || notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
364 {:error, error} -> Repo.rollback(error)
368 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
369 {:ok, Activity.t()} | nil | {:error, any()}
370 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
371 with {:ok, result} <-
372 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
377 defp do_unfollow(follower, followed, activity_id, local) do
378 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
379 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
380 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
381 {:ok, activity} <- insert(unfollow_data, local),
382 _ <- notify_and_stream(activity),
383 :ok <- maybe_federate(activity) do
387 {:error, error} -> Repo.rollback(error)
391 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
392 {:ok, Activity.t()} | {:error, any()}
393 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
394 with {:ok, result} <-
395 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
400 defp do_block(blocker, blocked, activity_id, local) do
401 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
403 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
404 unfollow(blocker, blocked, nil, local)
407 block_data = make_block_data(blocker, blocked, activity_id)
409 with {:ok, activity} <- insert(block_data, local),
410 _ <- notify_and_stream(activity),
411 :ok <- maybe_federate(activity) do
414 {:error, error} -> Repo.rollback(error)
418 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
432 additional = params[:additional] || %{}
436 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
438 Map.merge(additional, %{"to" => [], "cc" => []})
441 with flag_data <- make_flag_data(params, additional),
442 {:ok, activity} <- insert(flag_data, local),
443 {:ok, stripped_activity} <- strip_report_status_data(activity),
444 _ <- notify_and_stream(activity),
445 :ok <- maybe_federate(stripped_activity) do
446 User.all_superusers()
447 |> Enum.filter(fn user -> not is_nil(user.email) end)
448 |> Enum.each(fn superuser ->
450 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
451 |> Pleroma.Emails.Mailer.deliver_async()
458 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
459 def move(%User{} = origin, %User{} = target, local \\ true) do
462 "actor" => origin.ap_id,
463 "object" => origin.ap_id,
464 "target" => target.ap_id
467 with true <- origin.ap_id in target.also_known_as,
468 {:ok, activity} <- insert(params, local),
469 _ <- notify_and_stream(activity) do
470 maybe_federate(activity)
472 BackgroundWorker.enqueue("move_following", %{
473 "origin_id" => origin.id,
474 "target_id" => target.id
479 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
484 def fetch_activities_for_context_query(context, opts) do
485 public = [Constants.as_public()]
489 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
492 from(activity in Activity)
493 |> maybe_preload_objects(opts)
494 |> maybe_preload_bookmarks(opts)
495 |> maybe_set_thread_muted_field(opts)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts[:user])
501 "?->>'type' = ? and ?->>'context' = ?",
508 |> exclude_poll_votes(opts)
510 |> order_by([activity], desc: activity.id)
513 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
514 def fetch_activities_for_context(context, opts \\ %{}) do
516 |> fetch_activities_for_context_query(opts)
520 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
521 FlakeId.Ecto.CompatType.t() | nil
522 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
524 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
530 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
531 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
532 opts = Map.delete(opts, :user)
534 [Constants.as_public()]
535 |> fetch_activities_query(opts)
536 |> restrict_unlisted(opts)
537 |> Pagination.fetch_paginated(opts, pagination)
540 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
541 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
543 |> Map.put(:restrict_unlisted, true)
544 |> fetch_public_or_unlisted_activities(pagination)
547 @valid_visibilities ~w[direct unlisted public private]
549 defp restrict_visibility(query, %{visibility: visibility})
550 when is_list(visibility) do
551 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
556 "activity_visibility(?, ?, ?) = ANY (?)",
564 Logger.error("Could not restrict visibility to #{visibility}")
568 defp restrict_visibility(query, %{visibility: visibility})
569 when visibility in @valid_visibilities do
573 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
577 defp restrict_visibility(_query, %{visibility: visibility})
578 when visibility not in @valid_visibilities do
579 Logger.error("Could not restrict visibility to #{visibility}")
582 defp restrict_visibility(query, _visibility), do: query
584 defp exclude_visibility(query, %{exclude_visibilities: visibility})
585 when is_list(visibility) do
586 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
591 "activity_visibility(?, ?, ?) = ANY (?)",
599 Logger.error("Could not exclude visibility to #{visibility}")
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when visibility in @valid_visibilities do
610 "activity_visibility(?, ?, ?) = ?",
619 defp exclude_visibility(query, %{exclude_visibilities: visibility})
620 when visibility not in [nil | @valid_visibilities] do
621 Logger.error("Could not exclude visibility to #{visibility}")
625 defp exclude_visibility(query, _visibility), do: query
627 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
630 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
633 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
636 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
640 defp restrict_thread_visibility(query, _, _), do: query
642 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
645 |> Map.put(:user, reading_user)
646 |> Map.put(:actor_id, user.ap_id)
649 godmode: params[:godmode],
650 reading_user: reading_user
652 |> user_activities_recipients()
653 |> fetch_activities(params)
657 def fetch_user_activities(user, reading_user, params \\ %{}) do
660 |> Map.put(:type, ["Create", "Announce"])
661 |> Map.put(:user, reading_user)
662 |> Map.put(:actor_id, user.ap_id)
663 |> Map.put(:pinned_activity_ids, user.pinned_activities)
666 if User.blocks?(reading_user, user) do
670 |> Map.put(:blocking_user, reading_user)
671 |> Map.put(:muting_user, reading_user)
675 godmode: params[:godmode],
676 reading_user: reading_user
678 |> user_activities_recipients()
679 |> fetch_activities(params)
683 def fetch_statuses(reading_user, params) do
684 params = Map.put(params, :type, ["Create", "Announce"])
687 godmode: params[:godmode],
688 reading_user: reading_user
690 |> user_activities_recipients()
691 |> fetch_activities(params, :offset)
695 defp user_activities_recipients(%{godmode: true}), do: []
697 defp user_activities_recipients(%{reading_user: reading_user}) do
699 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
701 [Constants.as_public()]
705 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
706 raise "Can't use the child object without preloading!"
709 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
711 [activity, object] in query,
714 "?->>'type' != ? or ?->>'actor' != ?",
723 defp restrict_announce_object_actor(query, _), do: query
725 defp restrict_since(query, %{since_id: ""}), do: query
727 defp restrict_since(query, %{since_id: since_id}) do
728 from(activity in query, where: activity.id > ^since_id)
731 defp restrict_since(query, _), do: query
733 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
734 raise "Can't use the child object without preloading!"
737 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
739 [_activity, object] in query,
740 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
744 defp restrict_tag_reject(query, _), do: query
746 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
747 raise "Can't use the child object without preloading!"
750 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
752 [_activity, object] in query,
753 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
757 defp restrict_tag_all(query, _), do: query
759 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
760 raise "Can't use the child object without preloading!"
763 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
765 [_activity, object] in query,
766 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
770 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
772 [_activity, object] in query,
773 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
777 defp restrict_tag(query, _), do: query
779 defp restrict_recipients(query, [], _user), do: query
781 defp restrict_recipients(query, recipients, nil) do
782 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
785 defp restrict_recipients(query, recipients, user) do
788 where: fragment("? && ?", ^recipients, activity.recipients),
789 or_where: activity.actor == ^user.ap_id
793 defp restrict_local(query, %{local_only: true}) do
794 from(activity in query, where: activity.local == true)
797 defp restrict_local(query, _), do: query
799 defp restrict_actor(query, %{actor_id: actor_id}) do
800 from(activity in query, where: activity.actor == ^actor_id)
803 defp restrict_actor(query, _), do: query
805 defp restrict_type(query, %{type: type}) when is_binary(type) do
806 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
809 defp restrict_type(query, %{type: type}) do
810 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
813 defp restrict_type(query, _), do: query
815 defp restrict_state(query, %{state: state}) do
816 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
819 defp restrict_state(query, _), do: query
821 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
823 [_activity, object] in query,
824 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
828 defp restrict_favorited_by(query, _), do: query
830 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
831 raise "Can't use the child object without preloading!"
834 defp restrict_media(query, %{only_media: true}) do
836 [_activity, object] in query,
837 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
841 defp restrict_media(query, _), do: query
843 defp restrict_replies(query, %{exclude_replies: true}) do
845 [_activity, object] in query,
846 where: fragment("?->>'inReplyTo' is null", object.data)
850 defp restrict_replies(query, %{
851 reply_filtering_user: user,
852 reply_visibility: "self"
855 [activity, object] in query,
858 "?->>'inReplyTo' is null OR ? = ANY(?)",
866 defp restrict_replies(query, %{
867 reply_filtering_user: user,
868 reply_visibility: "following"
871 [activity, object] in query,
874 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
876 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
885 defp restrict_replies(query, _), do: query
887 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
888 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
891 defp restrict_reblogs(query, _), do: query
893 defp restrict_muted(query, %{with_muted: true}), do: query
895 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
896 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
899 from([activity] in query,
900 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
901 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
904 unless opts[:skip_preload] do
905 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
911 defp restrict_muted(query, _), do: query
913 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
914 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
915 domain_blocks = user.domain_blocks || []
917 following_ap_ids = User.get_friends_ap_ids(user)
920 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
923 [activity, object: o] in query,
924 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
925 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
928 "recipients_contain_blocked_domains(?, ?) = false",
934 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
941 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
949 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
958 defp restrict_blocked(query, _), do: query
960 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
965 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
967 ^[Constants.as_public()]
972 defp restrict_unlisted(query, _), do: query
974 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
975 from(activity in query, where: activity.id in ^ids)
978 defp restrict_pinned(query, _), do: query
980 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
981 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
987 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
995 defp restrict_muted_reblogs(query, _), do: query
997 defp restrict_instance(query, %{instance: instance}) do
1002 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1006 from(activity in query, where: activity.actor in ^users)
1009 defp restrict_instance(query, _), do: query
1011 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1013 defp exclude_poll_votes(query, _) do
1014 if has_named_binding?(query, :object) do
1015 from([activity, object: o] in query,
1016 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1023 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1025 defp exclude_chat_messages(query, _) do
1026 if has_named_binding?(query, :object) do
1027 from([activity, object: o] in query,
1028 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1035 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1037 defp exclude_invisible_actors(query, _opts) do
1039 User.Query.build(%{invisible: true, select: [:ap_id]})
1041 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1043 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1046 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1047 from(activity in query, where: activity.id != ^id)
1050 defp exclude_id(query, _), do: query
1052 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1054 defp maybe_preload_objects(query, _) do
1056 |> Activity.with_preloaded_object()
1059 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1061 defp maybe_preload_bookmarks(query, opts) do
1063 |> Activity.with_preloaded_bookmark(opts[:user])
1066 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1068 |> Activity.with_preloaded_report_notes()
1071 defp maybe_preload_report_notes(query, _), do: query
1073 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1075 defp maybe_set_thread_muted_field(query, opts) do
1077 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1080 defp maybe_order(query, %{order: :desc}) do
1082 |> order_by(desc: :id)
1085 defp maybe_order(query, %{order: :asc}) do
1087 |> order_by(asc: :id)
1090 defp maybe_order(query, _), do: query
1092 defp fetch_activities_query_ap_ids_ops(opts) do
1093 source_user = opts[:muting_user]
1094 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1096 ap_id_relationships =
1097 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1098 [:block | ap_id_relationships]
1103 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1105 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1106 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1108 restrict_muted_reblogs_opts =
1109 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1111 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1114 def fetch_activities_query(recipients, opts \\ %{}) do
1115 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1116 fetch_activities_query_ap_ids_ops(opts)
1119 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1123 |> maybe_preload_objects(opts)
1124 |> maybe_preload_bookmarks(opts)
1125 |> maybe_preload_report_notes(opts)
1126 |> maybe_set_thread_muted_field(opts)
1127 |> maybe_order(opts)
1128 |> restrict_recipients(recipients, opts[:user])
1129 |> restrict_replies(opts)
1130 |> restrict_tag(opts)
1131 |> restrict_tag_reject(opts)
1132 |> restrict_tag_all(opts)
1133 |> restrict_since(opts)
1134 |> restrict_local(opts)
1135 |> restrict_actor(opts)
1136 |> restrict_type(opts)
1137 |> restrict_state(opts)
1138 |> restrict_favorited_by(opts)
1139 |> restrict_blocked(restrict_blocked_opts)
1140 |> restrict_muted(restrict_muted_opts)
1141 |> restrict_media(opts)
1142 |> restrict_visibility(opts)
1143 |> restrict_thread_visibility(opts, config)
1144 |> restrict_reblogs(opts)
1145 |> restrict_pinned(opts)
1146 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1147 |> restrict_instance(opts)
1148 |> restrict_announce_object_actor(opts)
1149 |> Activity.restrict_deactivated_users()
1150 |> exclude_poll_votes(opts)
1151 |> exclude_chat_messages(opts)
1152 |> exclude_invisible_actors(opts)
1153 |> exclude_visibility(opts)
1156 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1157 list_memberships = Pleroma.List.memberships(opts[:user])
1159 fetch_activities_query(recipients ++ list_memberships, opts)
1160 |> Pagination.fetch_paginated(opts, pagination)
1162 |> maybe_update_cc(list_memberships, opts[:user])
1166 Fetch favorites activities of user with order by sort adds to favorites
1168 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1169 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1171 |> Activity.Queries.by_actor()
1172 |> Activity.Queries.by_type("Like")
1173 |> Activity.with_joined_object()
1174 |> Object.with_joined_activity()
1175 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1176 |> order_by([like, _, _], desc_nulls_last: like.id)
1177 |> Pagination.fetch_paginated(
1178 Map.merge(params, %{skip_order: true}),
1183 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1184 Enum.map(activities, fn
1185 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1186 if Enum.any?(bcc, &(&1 in list_memberships)) do
1187 update_in(activity.data["cc"], &[user_ap_id | &1])
1197 defp maybe_update_cc(activities, _, _), do: activities
1199 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1200 from(activity in query,
1202 fragment("? && ?", activity.recipients, ^recipients) or
1203 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1204 ^Constants.as_public() in activity.recipients)
1208 def fetch_activities_bounded(
1210 recipients_with_public,
1212 pagination \\ :keyset
1214 fetch_activities_query([], opts)
1215 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1216 |> Pagination.fetch_paginated(opts, pagination)
1220 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1221 def upload(file, opts \\ []) do
1222 with {:ok, data} <- Upload.store(file, opts) do
1223 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1225 Repo.insert(%Object{data: obj_data})
1229 @spec get_actor_url(any()) :: binary() | nil
1230 defp get_actor_url(url) when is_binary(url), do: url
1231 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1233 defp get_actor_url(url) when is_list(url) do
1239 defp get_actor_url(_url), do: nil
1241 defp object_to_user_data(data) do
1243 data["icon"]["url"] &&
1246 "url" => [%{"href" => data["icon"]["url"]}]
1250 data["image"]["url"] &&
1253 "url" => [%{"href" => data["image"]["url"]}]
1258 |> Map.get("attachment", [])
1259 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1260 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1264 |> Map.get("tag", [])
1266 %{"type" => "Emoji"} -> true
1269 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1270 {String.trim(name, ":"), url}
1273 locked = data["manuallyApprovesFollowers"] || false
1274 data = Transmogrifier.maybe_fix_user_object(data)
1275 discoverable = data["discoverable"] || false
1276 invisible = data["invisible"] || false
1277 actor_type = data["type"] || "Person"
1280 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1281 data["publicKey"]["publicKeyPem"]
1287 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1288 data["endpoints"]["sharedInbox"]
1295 uri: get_actor_url(data["url"]),
1301 discoverable: discoverable,
1302 invisible: invisible,
1305 follower_address: data["followers"],
1306 following_address: data["following"],
1307 bio: data["summary"],
1308 actor_type: actor_type,
1309 also_known_as: Map.get(data, "alsoKnownAs", []),
1310 public_key: public_key,
1311 inbox: data["inbox"],
1312 shared_inbox: shared_inbox
1315 # nickname can be nil because of virtual actors
1316 if data["preferredUsername"] do
1320 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1323 Map.put(user_data, :nickname, nil)
1327 def fetch_follow_information_for_user(user) do
1328 with {:ok, following_data} <-
1329 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1330 {:ok, hide_follows} <- collection_private(following_data),
1331 {:ok, followers_data} <-
1332 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1333 {:ok, hide_followers} <- collection_private(followers_data) do
1336 hide_follows: hide_follows,
1337 follower_count: normalize_counter(followers_data["totalItems"]),
1338 following_count: normalize_counter(following_data["totalItems"]),
1339 hide_followers: hide_followers
1342 {:error, _} = e -> e
1347 defp normalize_counter(counter) when is_integer(counter), do: counter
1348 defp normalize_counter(_), do: 0
1350 def maybe_update_follow_information(user_data) do
1351 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1352 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1354 {:collections_available,
1355 !!(user_data[:following_address] && user_data[:follower_address])},
1357 fetch_follow_information_for_user(user_data) do
1358 info = Map.merge(user_data[:info] || %{}, info)
1361 |> Map.put(:info, info)
1363 {:user_type_check, false} ->
1366 {:collections_available, false} ->
1369 {:enabled, false} ->
1374 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1381 defp collection_private(%{"first" => %{"type" => type}})
1382 when type in ["CollectionPage", "OrderedCollectionPage"],
1385 defp collection_private(%{"first" => first}) do
1386 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1387 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1390 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1391 {:error, _} = e -> e
1396 defp collection_private(_data), do: {:ok, true}
1398 def user_data_from_user_object(data) do
1399 with {:ok, data} <- MRF.filter(data) do
1400 {:ok, object_to_user_data(data)}
1406 def fetch_and_prepare_user_from_ap_id(ap_id) do
1407 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1408 {:ok, data} <- user_data_from_user_object(data) do
1409 {:ok, maybe_update_follow_information(data)}
1411 {:error, "Object has been deleted" = e} ->
1412 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1416 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1421 def make_user_from_ap_id(ap_id) do
1422 user = User.get_cached_by_ap_id(ap_id)
1424 if user && !User.ap_enabled?(user) do
1425 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1427 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1430 |> User.remote_user_changeset(data)
1431 |> User.update_and_set_cache()
1434 |> User.remote_user_changeset()
1442 def make_user_from_nickname(nickname) do
1443 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1444 make_user_from_ap_id(ap_id)
1446 _e -> {:error, "No AP id in WebFinger"}
1450 # filter out broken threads
1451 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1452 entire_thread_visible_for_user?(activity, user)
1455 # do post-processing on a specific activity
1456 def contain_activity(%Activity{} = activity, %User{} = user) do
1457 contain_broken_threads(activity, user)
1460 def fetch_direct_messages_query do
1462 |> restrict_type(%{type: "Create"})
1463 |> restrict_visibility(%{visibility: "direct"})
1464 |> order_by([activity], asc: activity.id)