1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
92 Object.increase_vote_count(reply_ap_id, name, actor)
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
109 Repo.insert(%Activity{
112 recipients: recipients,
113 actor: object["actor"]
115 {:ok, activity, meta}
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
135 recipients: recipients
138 |> maybe_create_activity_expiration()
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
147 %Activity{} = activity ->
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
155 recipients: recipients,
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
173 stream_out_participations(participations)
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
182 defp maybe_create_activity_expiration(result), do: result
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
192 defp get_participations({:ok, conversation}) do
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
198 defp get_participations(_), do: []
200 def stream_out_participations(participations) do
203 |> Repo.preload(:user)
205 Streamer.stream("participation", participations)
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
213 fetch_latest_activity_id_for_context(conversation.ap_id, %{
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
224 def stream_out_participations(_, _), do: :noop
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
233 def stream_out(_activity) do
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
253 %{to: to, actor: actor, published: published, context: context, object: object},
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
267 {:quick_insert, true, activity} ->
270 {:fake, true, activity} ->
274 Repo.rollback(message)
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
287 %{to: to, actor: actor, published: published, context: context, object: object},
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
324 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
325 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
326 local = !(params[:local] == false)
327 activity_id = params[:activity_id]
337 |> Maps.put_if_present("id", activity_id)
339 with {:ok, activity} <- insert(data, local),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(activity) do
346 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
347 {:ok, Activity.t()} | {:error, any()}
348 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
355 defp do_follow(follower, followed, activity_id, local, opts) do
356 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
357 data = make_follow_data(follower, followed, activity_id)
359 with {:ok, activity} <- insert(data, local),
360 _ <- skip_notify_and_stream || notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
364 {:error, error} -> Repo.rollback(error)
368 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
369 {:ok, Activity.t()} | nil | {:error, any()}
370 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
371 with {:ok, result} <-
372 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
377 defp do_unfollow(follower, followed, activity_id, local) do
378 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
379 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
380 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
381 {:ok, activity} <- insert(unfollow_data, local),
382 _ <- notify_and_stream(activity),
383 :ok <- maybe_federate(activity) do
387 {:error, error} -> Repo.rollback(error)
391 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
392 {:ok, Activity.t()} | {:error, any()}
393 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
394 with {:ok, result} <-
395 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
400 defp do_block(blocker, blocked, activity_id, local) do
401 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
403 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
404 unfollow(blocker, blocked, nil, local)
407 block_data = make_block_data(blocker, blocked, activity_id)
409 with {:ok, activity} <- insert(block_data, local),
410 _ <- notify_and_stream(activity),
411 :ok <- maybe_federate(activity) do
414 {:error, error} -> Repo.rollback(error)
418 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
432 additional = params[:additional] || %{}
436 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
438 Map.merge(additional, %{"to" => [], "cc" => []})
441 with flag_data <- make_flag_data(params, additional),
442 {:ok, activity} <- insert(flag_data, local),
443 {:ok, stripped_activity} <- strip_report_status_data(activity),
444 _ <- notify_and_stream(activity),
445 :ok <- maybe_federate(stripped_activity) do
446 User.all_superusers()
447 |> Enum.filter(fn user -> not is_nil(user.email) end)
448 |> Enum.each(fn superuser ->
450 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
451 |> Pleroma.Emails.Mailer.deliver_async()
458 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
459 def move(%User{} = origin, %User{} = target, local \\ true) do
462 "actor" => origin.ap_id,
463 "object" => origin.ap_id,
464 "target" => target.ap_id
467 with true <- origin.ap_id in target.also_known_as,
468 {:ok, activity} <- insert(params, local),
469 _ <- notify_and_stream(activity) do
470 maybe_federate(activity)
472 BackgroundWorker.enqueue("move_following", %{
473 "origin_id" => origin.id,
474 "target_id" => target.id
479 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
484 def fetch_activities_for_context_query(context, opts) do
485 public = [Constants.as_public()]
489 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
492 from(activity in Activity)
493 |> maybe_preload_objects(opts)
494 |> maybe_preload_bookmarks(opts)
495 |> maybe_set_thread_muted_field(opts)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts[:user])
501 "?->>'type' = ? and ?->>'context' = ?",
508 |> exclude_poll_votes(opts)
510 |> order_by([activity], desc: activity.id)
513 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
514 def fetch_activities_for_context(context, opts \\ %{}) do
516 |> fetch_activities_for_context_query(opts)
520 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
521 FlakeId.Ecto.CompatType.t() | nil
522 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
524 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
530 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
531 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
532 opts = Map.delete(opts, :user)
534 [Constants.as_public()]
535 |> fetch_activities_query(opts)
536 |> restrict_unlisted(opts)
537 |> Pagination.fetch_paginated(opts, pagination)
540 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
541 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
543 |> Map.put(:restrict_unlisted, true)
544 |> fetch_public_or_unlisted_activities(pagination)
547 @valid_visibilities ~w[direct unlisted public private]
549 defp restrict_visibility(query, %{visibility: visibility})
550 when is_list(visibility) do
551 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
556 "activity_visibility(?, ?, ?) = ANY (?)",
564 Logger.error("Could not restrict visibility to #{visibility}")
568 defp restrict_visibility(query, %{visibility: visibility})
569 when visibility in @valid_visibilities do
573 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
577 defp restrict_visibility(_query, %{visibility: visibility})
578 when visibility not in @valid_visibilities do
579 Logger.error("Could not restrict visibility to #{visibility}")
582 defp restrict_visibility(query, _visibility), do: query
584 defp exclude_visibility(query, %{exclude_visibilities: visibility})
585 when is_list(visibility) do
586 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
591 "activity_visibility(?, ?, ?) = ANY (?)",
599 Logger.error("Could not exclude visibility to #{visibility}")
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when visibility in @valid_visibilities do
610 "activity_visibility(?, ?, ?) = ?",
619 defp exclude_visibility(query, %{exclude_visibilities: visibility})
620 when visibility not in [nil | @valid_visibilities] do
621 Logger.error("Could not exclude visibility to #{visibility}")
625 defp exclude_visibility(query, _visibility), do: query
627 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
630 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
633 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
636 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
640 defp restrict_thread_visibility(query, _, _), do: query
642 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
645 |> Map.put(:user, reading_user)
646 |> Map.put(:actor_id, user.ap_id)
649 godmode: params[:godmode],
650 reading_user: reading_user
652 |> user_activities_recipients()
653 |> fetch_activities(params)
657 def fetch_user_activities(user, reading_user, params \\ %{}) do
660 |> Map.put(:type, ["Create", "Announce"])
661 |> Map.put(:user, reading_user)
662 |> Map.put(:actor_id, user.ap_id)
663 |> Map.put(:pinned_activity_ids, user.pinned_activities)
666 if User.blocks?(reading_user, user) do
670 |> Map.put(:blocking_user, reading_user)
671 |> Map.put(:muting_user, reading_user)
675 godmode: params[:godmode],
676 reading_user: reading_user
678 |> user_activities_recipients()
679 |> fetch_activities(params)
683 def fetch_statuses(reading_user, params) do
684 params = Map.put(params, :type, ["Create", "Announce"])
687 godmode: params[:godmode],
688 reading_user: reading_user
690 |> user_activities_recipients()
691 |> fetch_activities(params, :offset)
695 defp user_activities_recipients(%{godmode: true}), do: []
697 defp user_activities_recipients(%{reading_user: reading_user}) do
699 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
701 [Constants.as_public()]
705 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
706 raise "Can't use the child object without preloading!"
709 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
711 [activity, object] in query,
714 "?->>'type' != ? or ?->>'actor' != ?",
723 defp restrict_announce_object_actor(query, _), do: query
725 defp restrict_since(query, %{since_id: ""}), do: query
727 defp restrict_since(query, %{since_id: since_id}) do
728 from(activity in query, where: activity.id > ^since_id)
731 defp restrict_since(query, _), do: query
733 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
734 raise "Can't use the child object without preloading!"
737 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
739 [_activity, object] in query,
740 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
744 defp restrict_tag_reject(query, _), do: query
746 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
747 raise "Can't use the child object without preloading!"
750 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
752 [_activity, object] in query,
753 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
757 defp restrict_tag_all(query, _), do: query
759 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
760 raise "Can't use the child object without preloading!"
763 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
765 [_activity, object] in query,
766 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
770 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
772 [_activity, object] in query,
773 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
777 defp restrict_tag(query, _), do: query
779 defp restrict_recipients(query, [], _user), do: query
781 defp restrict_recipients(query, recipients, nil) do
782 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
785 defp restrict_recipients(query, recipients, user) do
788 where: fragment("? && ?", ^recipients, activity.recipients),
789 or_where: activity.actor == ^user.ap_id
793 defp restrict_local(query, %{local_only: true}) do
794 from(activity in query, where: activity.local == true)
797 defp restrict_local(query, _), do: query
799 defp restrict_actor(query, %{actor_id: actor_id}) do
800 from(activity in query, where: activity.actor == ^actor_id)
803 defp restrict_actor(query, _), do: query
805 defp restrict_type(query, %{type: type}) when is_binary(type) do
806 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
809 defp restrict_type(query, %{type: type}) do
810 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
813 defp restrict_type(query, _), do: query
815 defp restrict_state(query, %{state: state}) do
816 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
819 defp restrict_state(query, _), do: query
821 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
823 [_activity, object] in query,
824 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
828 defp restrict_favorited_by(query, _), do: query
830 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
831 raise "Can't use the child object without preloading!"
834 defp restrict_media(query, %{only_media: true}) do
836 [activity, object] in query,
837 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
838 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
842 defp restrict_media(query, _), do: query
844 defp restrict_replies(query, %{exclude_replies: true}) do
846 [_activity, object] in query,
847 where: fragment("?->>'inReplyTo' is null", object.data)
851 defp restrict_replies(query, %{
852 reply_filtering_user: user,
853 reply_visibility: "self"
856 [activity, object] in query,
859 "?->>'inReplyTo' is null OR ? = ANY(?)",
867 defp restrict_replies(query, %{
868 reply_filtering_user: user,
869 reply_visibility: "following"
872 [activity, object] in query,
875 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
877 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
886 defp restrict_replies(query, _), do: query
888 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
889 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
892 defp restrict_reblogs(query, _), do: query
894 defp restrict_muted(query, %{with_muted: true}), do: query
896 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
897 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
900 from([activity] in query,
901 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
902 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
905 unless opts[:skip_preload] do
906 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
912 defp restrict_muted(query, _), do: query
914 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
915 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
916 domain_blocks = user.domain_blocks || []
918 following_ap_ids = User.get_friends_ap_ids(user)
921 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
924 [activity, object: o] in query,
925 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
926 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
929 "recipients_contain_blocked_domains(?, ?) = false",
935 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
942 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
950 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
959 defp restrict_blocked(query, _), do: query
961 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
966 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
968 ^[Constants.as_public()]
973 defp restrict_unlisted(query, _), do: query
975 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
976 from(activity in query, where: activity.id in ^ids)
979 defp restrict_pinned(query, _), do: query
981 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
982 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
988 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
996 defp restrict_muted_reblogs(query, _), do: query
998 defp restrict_instance(query, %{instance: instance}) do
1003 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1007 from(activity in query, where: activity.actor in ^users)
1010 defp restrict_instance(query, _), do: query
1012 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1014 defp exclude_poll_votes(query, _) do
1015 if has_named_binding?(query, :object) do
1016 from([activity, object: o] in query,
1017 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1024 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1026 defp exclude_chat_messages(query, _) do
1027 if has_named_binding?(query, :object) do
1028 from([activity, object: o] in query,
1029 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1036 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1038 defp exclude_invisible_actors(query, _opts) do
1040 User.Query.build(%{invisible: true, select: [:ap_id]})
1042 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1044 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1047 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1048 from(activity in query, where: activity.id != ^id)
1051 defp exclude_id(query, _), do: query
1053 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1055 defp maybe_preload_objects(query, _) do
1057 |> Activity.with_preloaded_object()
1060 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1062 defp maybe_preload_bookmarks(query, opts) do
1064 |> Activity.with_preloaded_bookmark(opts[:user])
1067 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1069 |> Activity.with_preloaded_report_notes()
1072 defp maybe_preload_report_notes(query, _), do: query
1074 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1076 defp maybe_set_thread_muted_field(query, opts) do
1078 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1081 defp maybe_order(query, %{order: :desc}) do
1083 |> order_by(desc: :id)
1086 defp maybe_order(query, %{order: :asc}) do
1088 |> order_by(asc: :id)
1091 defp maybe_order(query, _), do: query
1093 defp fetch_activities_query_ap_ids_ops(opts) do
1094 source_user = opts[:muting_user]
1095 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1097 ap_id_relationships =
1098 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1099 [:block | ap_id_relationships]
1104 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1106 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1107 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1109 restrict_muted_reblogs_opts =
1110 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1112 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1115 def fetch_activities_query(recipients, opts \\ %{}) do
1116 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1117 fetch_activities_query_ap_ids_ops(opts)
1120 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1124 |> maybe_preload_objects(opts)
1125 |> maybe_preload_bookmarks(opts)
1126 |> maybe_preload_report_notes(opts)
1127 |> maybe_set_thread_muted_field(opts)
1128 |> maybe_order(opts)
1129 |> restrict_recipients(recipients, opts[:user])
1130 |> restrict_replies(opts)
1131 |> restrict_tag(opts)
1132 |> restrict_tag_reject(opts)
1133 |> restrict_tag_all(opts)
1134 |> restrict_since(opts)
1135 |> restrict_local(opts)
1136 |> restrict_actor(opts)
1137 |> restrict_type(opts)
1138 |> restrict_state(opts)
1139 |> restrict_favorited_by(opts)
1140 |> restrict_blocked(restrict_blocked_opts)
1141 |> restrict_muted(restrict_muted_opts)
1142 |> restrict_media(opts)
1143 |> restrict_visibility(opts)
1144 |> restrict_thread_visibility(opts, config)
1145 |> restrict_reblogs(opts)
1146 |> restrict_pinned(opts)
1147 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1148 |> restrict_instance(opts)
1149 |> restrict_announce_object_actor(opts)
1150 |> Activity.restrict_deactivated_users()
1151 |> exclude_poll_votes(opts)
1152 |> exclude_chat_messages(opts)
1153 |> exclude_invisible_actors(opts)
1154 |> exclude_visibility(opts)
1157 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1158 list_memberships = Pleroma.List.memberships(opts[:user])
1160 fetch_activities_query(recipients ++ list_memberships, opts)
1161 |> Pagination.fetch_paginated(opts, pagination)
1163 |> maybe_update_cc(list_memberships, opts[:user])
1167 Fetch favorites activities of user with order by sort adds to favorites
1169 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1170 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1172 |> Activity.Queries.by_actor()
1173 |> Activity.Queries.by_type("Like")
1174 |> Activity.with_joined_object()
1175 |> Object.with_joined_activity()
1176 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1177 |> order_by([like, _, _], desc_nulls_last: like.id)
1178 |> Pagination.fetch_paginated(
1179 Map.merge(params, %{skip_order: true}),
1184 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1185 Enum.map(activities, fn
1186 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1187 if Enum.any?(bcc, &(&1 in list_memberships)) do
1188 update_in(activity.data["cc"], &[user_ap_id | &1])
1198 defp maybe_update_cc(activities, _, _), do: activities
1200 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1201 from(activity in query,
1203 fragment("? && ?", activity.recipients, ^recipients) or
1204 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1205 ^Constants.as_public() in activity.recipients)
1209 def fetch_activities_bounded(
1211 recipients_with_public,
1213 pagination \\ :keyset
1215 fetch_activities_query([], opts)
1216 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1217 |> Pagination.fetch_paginated(opts, pagination)
1221 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1222 def upload(file, opts \\ []) do
1223 with {:ok, data} <- Upload.store(file, opts) do
1224 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1226 Repo.insert(%Object{data: obj_data})
1230 @spec get_actor_url(any()) :: binary() | nil
1231 defp get_actor_url(url) when is_binary(url), do: url
1232 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1234 defp get_actor_url(url) when is_list(url) do
1240 defp get_actor_url(_url), do: nil
1242 defp object_to_user_data(data) do
1244 data["icon"]["url"] &&
1247 "url" => [%{"href" => data["icon"]["url"]}]
1251 data["image"]["url"] &&
1254 "url" => [%{"href" => data["image"]["url"]}]
1259 |> Map.get("attachment", [])
1260 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1261 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1265 |> Map.get("tag", [])
1267 %{"type" => "Emoji"} -> true
1270 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1271 {String.trim(name, ":"), url}
1274 locked = data["manuallyApprovesFollowers"] || false
1275 data = Transmogrifier.maybe_fix_user_object(data)
1276 discoverable = data["discoverable"] || false
1277 invisible = data["invisible"] || false
1278 actor_type = data["type"] || "Person"
1281 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1282 data["publicKey"]["publicKeyPem"]
1288 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1289 data["endpoints"]["sharedInbox"]
1296 uri: get_actor_url(data["url"]),
1302 discoverable: discoverable,
1303 invisible: invisible,
1306 follower_address: data["followers"],
1307 following_address: data["following"],
1308 bio: data["summary"],
1309 actor_type: actor_type,
1310 also_known_as: Map.get(data, "alsoKnownAs", []),
1311 public_key: public_key,
1312 inbox: data["inbox"],
1313 shared_inbox: shared_inbox
1316 # nickname can be nil because of virtual actors
1317 if data["preferredUsername"] do
1321 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1324 Map.put(user_data, :nickname, nil)
1328 def fetch_follow_information_for_user(user) do
1329 with {:ok, following_data} <-
1330 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1331 {:ok, hide_follows} <- collection_private(following_data),
1332 {:ok, followers_data} <-
1333 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1334 {:ok, hide_followers} <- collection_private(followers_data) do
1337 hide_follows: hide_follows,
1338 follower_count: normalize_counter(followers_data["totalItems"]),
1339 following_count: normalize_counter(following_data["totalItems"]),
1340 hide_followers: hide_followers
1343 {:error, _} = e -> e
1348 defp normalize_counter(counter) when is_integer(counter), do: counter
1349 defp normalize_counter(_), do: 0
1351 def maybe_update_follow_information(user_data) do
1352 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1353 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1355 {:collections_available,
1356 !!(user_data[:following_address] && user_data[:follower_address])},
1358 fetch_follow_information_for_user(user_data) do
1359 info = Map.merge(user_data[:info] || %{}, info)
1362 |> Map.put(:info, info)
1364 {:user_type_check, false} ->
1367 {:collections_available, false} ->
1370 {:enabled, false} ->
1375 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1382 defp collection_private(%{"first" => %{"type" => type}})
1383 when type in ["CollectionPage", "OrderedCollectionPage"],
1386 defp collection_private(%{"first" => first}) do
1387 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1388 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1391 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1392 {:error, _} = e -> e
1397 defp collection_private(_data), do: {:ok, true}
1399 def user_data_from_user_object(data) do
1400 with {:ok, data} <- MRF.filter(data) do
1401 {:ok, object_to_user_data(data)}
1407 def fetch_and_prepare_user_from_ap_id(ap_id) do
1408 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1409 {:ok, data} <- user_data_from_user_object(data) do
1410 {:ok, maybe_update_follow_information(data)}
1412 {:error, "Object has been deleted" = e} ->
1413 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1417 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1422 def make_user_from_ap_id(ap_id) do
1423 user = User.get_cached_by_ap_id(ap_id)
1425 if user && !User.ap_enabled?(user) do
1426 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1428 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1431 |> User.remote_user_changeset(data)
1432 |> User.update_and_set_cache()
1435 |> User.remote_user_changeset()
1443 def make_user_from_nickname(nickname) do
1444 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1445 make_user_from_ap_id(ap_id)
1447 _e -> {:error, "No AP id in WebFinger"}
1451 # filter out broken threads
1452 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1453 entire_thread_visible_for_user?(activity, user)
1456 # do post-processing on a specific activity
1457 def contain_activity(%Activity{} = activity, %User{} = user) do
1458 contain_broken_threads(activity, user)
1461 def fetch_direct_messages_query do
1463 |> restrict_type(%{type: "Create"})
1464 |> restrict_visibility(%{visibility: "direct"})
1465 |> order_by([activity], asc: activity.id)