1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
56 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
58 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
65 defp check_actor_can_insert(_), do: true
67 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
68 limit = Config.get([:instance, :remote_limit])
69 String.length(content) <= limit
72 defp check_remote_limit(_), do: true
74 def increase_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78 def decrease_note_count_if_public(actor, object) do
79 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82 defp increase_replies_count_if_reply(%{
83 "object" => %{"inReplyTo" => reply_ap_id} = object,
86 if is_public?(object) do
87 Object.increase_replies_count(reply_ap_id)
91 defp increase_replies_count_if_reply(_create_data), do: :noop
93 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
95 def persist(%{"type" => type} = object, meta) when type in @object_types do
96 with {:ok, object} <- Object.create(object) do
102 def persist(object, meta) do
103 with local <- Keyword.fetch!(meta, :local),
104 {recipients, _, _} <- get_recipients(object),
106 Repo.insert(%Activity{
109 recipients: recipients,
110 actor: object["actor"]
112 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
113 {:ok, _} <- maybe_create_activity_expiration(activity) do
114 {:ok, activity, meta}
118 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
119 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
120 with nil <- Activity.normalize(map),
121 map <- lazy_put_activity_defaults(map, fake),
122 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
123 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
124 {:ok, map} <- MRF.filter(map),
125 {recipients, _, _} = get_recipients(map),
126 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
127 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
128 {:ok, map, object} <- insert_full_object(map),
129 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
130 # Splice in the child object if we have one.
131 activity = Maps.put_if_present(activity, :object, object)
133 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
134 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
139 %Activity{} = activity ->
145 {:containment, _} = error ->
148 {:error, _} = error ->
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
156 recipients: recipients,
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
163 {:remote_limit_pass, _} ->
164 {:error, :remote_limit}
171 defp insert_activity_with_expiration(data, local, recipients) do
175 actor: data["actor"],
176 recipients: recipients
179 with {:ok, activity} <- Repo.insert(struct) do
180 maybe_create_activity_expiration(activity)
184 def notify_and_stream(activity) do
185 Notification.create_notifications(activity)
187 conversation = create_or_bump_conversation(activity, activity.actor)
188 participations = get_participations(conversation)
190 stream_out_participations(participations)
193 defp maybe_create_activity_expiration(
194 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
197 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
198 activity_id: activity.id,
199 expires_at: expires_at
205 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
207 defp create_or_bump_conversation(activity, actor) do
208 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
209 %User{} = user <- User.get_cached_by_ap_id(actor) do
210 Participation.mark_as_read(user, conversation)
215 defp get_participations({:ok, conversation}) do
217 |> Repo.preload(:participations, force: true)
218 |> Map.get(:participations)
221 defp get_participations(_), do: []
223 def stream_out_participations(participations) do
226 |> Repo.preload(:user)
228 Streamer.stream("participation", participations)
232 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
233 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
234 conversation = Repo.preload(conversation, :participations)
237 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
242 if last_activity_id do
243 stream_out_participations(conversation.participations)
249 def stream_out_participations(_, _), do: :noop
252 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
253 when data_type in ["Create", "Announce", "Delete"] do
255 |> Topics.get_activity_topics()
256 |> Streamer.stream(activity)
260 def stream_out(_activity) do
264 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
265 def create(params, fake \\ false) do
266 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
271 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
272 additional = params[:additional] || %{}
273 # only accept false as false value
274 local = !(params[:local] == false)
275 published = params[:published]
276 quick_insert? = Config.get([:env]) == :benchmark
280 %{to: to, actor: actor, published: published, context: context, object: object},
284 with {:ok, activity} <- insert(create_data, local, fake),
285 {:fake, false, activity} <- {:fake, fake, activity},
286 _ <- increase_replies_count_if_reply(create_data),
287 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
288 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
289 _ <- notify_and_stream(activity),
290 :ok <- maybe_federate(activity) do
293 {:quick_insert, true, activity} ->
296 {:fake, true, activity} ->
300 Repo.rollback(message)
304 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
306 additional = params[:additional] || %{}
307 # only accept false as false value
308 local = !(params[:local] == false)
309 published = params[:published]
313 %{to: to, actor: actor, published: published, context: context, object: object},
317 with {:ok, activity} <- insert(listen_data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
324 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
325 {:ok, Activity.t()} | nil | {:error, any()}
326 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
327 with {:ok, result} <-
328 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
333 defp do_unfollow(follower, followed, activity_id, local) do
334 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
335 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
336 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
337 {:ok, activity} <- insert(unfollow_data, local),
338 _ <- notify_and_stream(activity),
339 :ok <- maybe_federate(activity) do
343 {:error, error} -> Repo.rollback(error)
347 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
349 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
363 # only accept false as false value
364 local = !(params[:local] == false)
365 forward = !(params[:forward] == false)
367 additional = params[:additional] || %{}
371 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
373 Map.merge(additional, %{"to" => [], "cc" => []})
376 with flag_data <- make_flag_data(params, additional),
377 {:ok, activity} <- insert(flag_data, local),
378 {:ok, stripped_activity} <- strip_report_status_data(activity),
379 _ <- notify_and_stream(activity),
381 maybe_federate(stripped_activity) do
382 User.all_superusers()
383 |> Enum.filter(fn user -> user.ap_id != actor end)
384 |> Enum.filter(fn user -> not is_nil(user.email) end)
385 |> Enum.each(fn superuser ->
387 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
388 |> Pleroma.Emails.Mailer.deliver_async()
393 {:error, error} -> Repo.rollback(error)
397 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
398 def move(%User{} = origin, %User{} = target, local \\ true) do
401 "actor" => origin.ap_id,
402 "object" => origin.ap_id,
403 "target" => target.ap_id
406 with true <- origin.ap_id in target.also_known_as,
407 {:ok, activity} <- insert(params, local),
408 _ <- notify_and_stream(activity) do
409 maybe_federate(activity)
411 BackgroundWorker.enqueue("move_following", %{
412 "origin_id" => origin.id,
413 "target_id" => target.id
418 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
423 def fetch_activities_for_context_query(context, opts) do
424 public = [Constants.as_public()]
428 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
431 from(activity in Activity)
432 |> maybe_preload_objects(opts)
433 |> maybe_preload_bookmarks(opts)
434 |> maybe_set_thread_muted_field(opts)
435 |> restrict_blocked(opts)
436 |> restrict_recipients(recipients, opts[:user])
437 |> restrict_filtered(opts)
441 "?->>'type' = ? and ?->>'context' = ?",
448 |> exclude_poll_votes(opts)
450 |> order_by([activity], desc: activity.id)
453 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
454 def fetch_activities_for_context(context, opts \\ %{}) do
456 |> fetch_activities_for_context_query(opts)
460 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
461 FlakeId.Ecto.CompatType.t() | nil
462 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
464 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
465 |> restrict_visibility(%{visibility: "direct"})
471 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
472 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
473 opts = Map.delete(opts, :user)
475 [Constants.as_public()]
476 |> fetch_activities_query(opts)
477 |> restrict_unlisted(opts)
478 |> Pagination.fetch_paginated(opts, pagination)
481 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
482 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
484 |> Map.put(:restrict_unlisted, true)
485 |> fetch_public_or_unlisted_activities(pagination)
488 @valid_visibilities ~w[direct unlisted public private]
490 defp restrict_visibility(query, %{visibility: visibility})
491 when is_list(visibility) do
492 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
497 "activity_visibility(?, ?, ?) = ANY (?)",
505 Logger.error("Could not restrict visibility to #{visibility}")
509 defp restrict_visibility(query, %{visibility: visibility})
510 when visibility in @valid_visibilities do
514 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
518 defp restrict_visibility(_query, %{visibility: visibility})
519 when visibility not in @valid_visibilities do
520 Logger.error("Could not restrict visibility to #{visibility}")
523 defp restrict_visibility(query, _visibility), do: query
525 defp exclude_visibility(query, %{exclude_visibilities: visibility})
526 when is_list(visibility) do
527 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
532 "activity_visibility(?, ?, ?) = ANY (?)",
540 Logger.error("Could not exclude visibility to #{visibility}")
545 defp exclude_visibility(query, %{exclude_visibilities: visibility})
546 when visibility in @valid_visibilities do
551 "activity_visibility(?, ?, ?) = ?",
560 defp exclude_visibility(query, %{exclude_visibilities: visibility})
561 when visibility not in [nil | @valid_visibilities] do
562 Logger.error("Could not exclude visibility to #{visibility}")
566 defp exclude_visibility(query, _visibility), do: query
568 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
571 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
574 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
577 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
581 defp restrict_thread_visibility(query, _, _), do: query
583 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
586 |> Map.put(:user, reading_user)
587 |> Map.put(:actor_id, user.ap_id)
590 godmode: params[:godmode],
591 reading_user: reading_user
593 |> user_activities_recipients()
594 |> fetch_activities(params)
598 def fetch_user_activities(user, reading_user, params \\ %{})
600 def fetch_user_activities(user, reading_user, %{total: true} = params) do
601 result = fetch_activities_for_user(user, reading_user, params)
603 Keyword.put(result, :items, Enum.reverse(result[:items]))
606 def fetch_user_activities(user, reading_user, params) do
608 |> fetch_activities_for_user(reading_user, params)
612 defp fetch_activities_for_user(user, reading_user, params) do
615 |> Map.put(:type, ["Create", "Announce"])
616 |> Map.put(:user, reading_user)
617 |> Map.put(:actor_id, user.ap_id)
618 |> Map.put(:pinned_activity_ids, user.pinned_activities)
621 if User.blocks?(reading_user, user) do
625 |> Map.put(:blocking_user, reading_user)
626 |> Map.put(:muting_user, reading_user)
629 pagination_type = Map.get(params, :pagination_type) || :keyset
632 godmode: params[:godmode],
633 reading_user: reading_user
635 |> user_activities_recipients()
636 |> fetch_activities(params, pagination_type)
639 def fetch_statuses(reading_user, %{total: true} = params) do
640 result = fetch_activities_for_reading_user(reading_user, params)
641 Keyword.put(result, :items, Enum.reverse(result[:items]))
644 def fetch_statuses(reading_user, params) do
646 |> fetch_activities_for_reading_user(params)
650 defp fetch_activities_for_reading_user(reading_user, params) do
651 params = Map.put(params, :type, ["Create", "Announce"])
654 godmode: params[:godmode],
655 reading_user: reading_user
657 |> user_activities_recipients()
658 |> fetch_activities(params, :offset)
661 defp user_activities_recipients(%{godmode: true}), do: []
663 defp user_activities_recipients(%{reading_user: reading_user}) do
665 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
667 [Constants.as_public()]
671 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
672 raise "Can't use the child object without preloading!"
675 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
677 [activity, object] in query,
680 "?->>'type' != ? or ?->>'actor' != ?",
689 defp restrict_announce_object_actor(query, _), do: query
691 defp restrict_since(query, %{since_id: ""}), do: query
693 defp restrict_since(query, %{since_id: since_id}) do
694 from(activity in query, where: activity.id > ^since_id)
697 defp restrict_since(query, _), do: query
699 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
700 raise "Can't use the child object without preloading!"
703 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
705 [_activity, object] in query,
706 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
710 defp restrict_tag_reject(query, _), do: query
712 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
713 raise "Can't use the child object without preloading!"
716 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
718 [_activity, object] in query,
719 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
723 defp restrict_tag_all(query, _), do: query
725 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
726 raise "Can't use the child object without preloading!"
729 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
731 [_activity, object] in query,
732 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
736 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
738 [_activity, object] in query,
739 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
743 defp restrict_tag(query, _), do: query
745 defp restrict_recipients(query, [], _user), do: query
747 defp restrict_recipients(query, recipients, nil) do
748 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
751 defp restrict_recipients(query, recipients, user) do
754 where: fragment("? && ?", ^recipients, activity.recipients),
755 or_where: activity.actor == ^user.ap_id
759 defp restrict_local(query, %{local_only: true}) do
760 from(activity in query, where: activity.local == true)
763 defp restrict_local(query, _), do: query
765 defp restrict_remote(query, %{remote: true}) do
766 from(activity in query, where: activity.local == false)
769 defp restrict_remote(query, _), do: query
771 defp restrict_actor(query, %{actor_id: actor_id}) do
772 from(activity in query, where: activity.actor == ^actor_id)
775 defp restrict_actor(query, _), do: query
777 defp restrict_type(query, %{type: type}) when is_binary(type) do
778 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
781 defp restrict_type(query, %{type: type}) do
782 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
785 defp restrict_type(query, _), do: query
787 defp restrict_state(query, %{state: state}) do
788 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
791 defp restrict_state(query, _), do: query
793 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
795 [_activity, object] in query,
796 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
800 defp restrict_favorited_by(query, _), do: query
802 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
803 raise "Can't use the child object without preloading!"
806 defp restrict_media(query, %{only_media: true}) do
808 [activity, object] in query,
809 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
810 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
814 defp restrict_media(query, _), do: query
816 defp restrict_replies(query, %{exclude_replies: true}) do
818 [_activity, object] in query,
819 where: fragment("?->>'inReplyTo' is null", object.data)
823 defp restrict_replies(query, %{
824 reply_filtering_user: %User{} = user,
825 reply_visibility: "self"
828 [activity, object] in query,
831 "?->>'inReplyTo' is null OR ? = ANY(?)",
839 defp restrict_replies(query, %{
840 reply_filtering_user: %User{} = user,
841 reply_visibility: "following"
844 [activity, object] in query,
848 ?->>'type' != 'Create' -- This isn't a Create
849 OR ?->>'inReplyTo' is null -- this isn't a reply
850 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
851 -- unless they are the author (because authors
852 -- are also part of the recipients). This leads
853 -- to a bug that self-replies by friends won't
855 OR ? = ? -- The actor is us
859 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
868 defp restrict_replies(query, _), do: query
870 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
871 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
874 defp restrict_reblogs(query, _), do: query
876 defp restrict_muted(query, %{with_muted: true}), do: query
878 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
879 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
882 from([activity] in query,
883 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
886 "not (?->'to' \\?| ?) or ? = ?",
894 unless opts[:skip_preload] do
895 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
901 defp restrict_muted(query, _), do: query
903 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
904 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
905 domain_blocks = user.domain_blocks || []
907 following_ap_ids = User.get_friends_ap_ids(user)
910 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
913 [activity, object: o] in query,
914 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
917 "((not (? && ?)) or ? = ?)",
925 "recipients_contain_blocked_domains(?, ?) = false",
931 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
938 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
946 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
955 defp restrict_blocked(query, _), do: query
957 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
962 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
964 ^[Constants.as_public()]
969 defp restrict_unlisted(query, _), do: query
971 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
972 from(activity in query, where: activity.id in ^ids)
975 defp restrict_pinned(query, _), do: query
977 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
978 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
984 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
992 defp restrict_muted_reblogs(query, _), do: query
994 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
997 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1001 defp restrict_instance(query, _), do: query
1003 defp restrict_filtered(query, %{user: %User{} = user}) do
1004 case Filter.compose_regex(user) do
1009 from([activity, object] in query,
1011 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1012 activity.actor == ^user.ap_id
1017 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1018 restrict_filtered(query, %{user: user})
1021 defp restrict_filtered(query, _), do: query
1023 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1025 defp exclude_poll_votes(query, _) do
1026 if has_named_binding?(query, :object) do
1027 from([activity, object: o] in query,
1028 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1035 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1037 defp exclude_chat_messages(query, _) do
1038 if has_named_binding?(query, :object) do
1039 from([activity, object: o] in query,
1040 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1047 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1049 defp exclude_invisible_actors(query, _opts) do
1051 User.Query.build(%{invisible: true, select: [:ap_id]})
1053 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1055 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1058 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1059 from(activity in query, where: activity.id != ^id)
1062 defp exclude_id(query, _), do: query
1064 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1066 defp maybe_preload_objects(query, _) do
1068 |> Activity.with_preloaded_object()
1071 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1073 defp maybe_preload_bookmarks(query, opts) do
1075 |> Activity.with_preloaded_bookmark(opts[:user])
1078 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1080 |> Activity.with_preloaded_report_notes()
1083 defp maybe_preload_report_notes(query, _), do: query
1085 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1087 defp maybe_set_thread_muted_field(query, opts) do
1089 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1092 defp maybe_order(query, %{order: :desc}) do
1094 |> order_by(desc: :id)
1097 defp maybe_order(query, %{order: :asc}) do
1099 |> order_by(asc: :id)
1102 defp maybe_order(query, _), do: query
1104 defp fetch_activities_query_ap_ids_ops(opts) do
1105 source_user = opts[:muting_user]
1106 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1108 ap_id_relationships =
1109 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1110 [:block | ap_id_relationships]
1115 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1117 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1118 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1120 restrict_muted_reblogs_opts =
1121 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1123 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1126 def fetch_activities_query(recipients, opts \\ %{}) do
1127 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1128 fetch_activities_query_ap_ids_ops(opts)
1131 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1135 |> maybe_preload_objects(opts)
1136 |> maybe_preload_bookmarks(opts)
1137 |> maybe_preload_report_notes(opts)
1138 |> maybe_set_thread_muted_field(opts)
1139 |> maybe_order(opts)
1140 |> restrict_recipients(recipients, opts[:user])
1141 |> restrict_replies(opts)
1142 |> restrict_tag(opts)
1143 |> restrict_tag_reject(opts)
1144 |> restrict_tag_all(opts)
1145 |> restrict_since(opts)
1146 |> restrict_local(opts)
1147 |> restrict_remote(opts)
1148 |> restrict_actor(opts)
1149 |> restrict_type(opts)
1150 |> restrict_state(opts)
1151 |> restrict_favorited_by(opts)
1152 |> restrict_blocked(restrict_blocked_opts)
1153 |> restrict_muted(restrict_muted_opts)
1154 |> restrict_filtered(opts)
1155 |> restrict_media(opts)
1156 |> restrict_visibility(opts)
1157 |> restrict_thread_visibility(opts, config)
1158 |> restrict_reblogs(opts)
1159 |> restrict_pinned(opts)
1160 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1161 |> restrict_instance(opts)
1162 |> restrict_announce_object_actor(opts)
1163 |> restrict_filtered(opts)
1164 |> Activity.restrict_deactivated_users()
1165 |> exclude_poll_votes(opts)
1166 |> exclude_chat_messages(opts)
1167 |> exclude_invisible_actors(opts)
1168 |> exclude_visibility(opts)
1171 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1172 list_memberships = Pleroma.List.memberships(opts[:user])
1174 fetch_activities_query(recipients ++ list_memberships, opts)
1175 |> Pagination.fetch_paginated(opts, pagination)
1177 |> maybe_update_cc(list_memberships, opts[:user])
1181 Fetch favorites activities of user with order by sort adds to favorites
1183 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1184 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1186 |> Activity.Queries.by_actor()
1187 |> Activity.Queries.by_type("Like")
1188 |> Activity.with_joined_object()
1189 |> Object.with_joined_activity()
1190 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1191 |> order_by([like, _, _], desc_nulls_last: like.id)
1192 |> Pagination.fetch_paginated(
1193 Map.merge(params, %{skip_order: true}),
1198 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1199 Enum.map(activities, fn
1200 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1201 if Enum.any?(bcc, &(&1 in list_memberships)) do
1202 update_in(activity.data["cc"], &[user_ap_id | &1])
1212 defp maybe_update_cc(activities, _, _), do: activities
1214 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1215 from(activity in query,
1217 fragment("? && ?", activity.recipients, ^recipients) or
1218 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1219 ^Constants.as_public() in activity.recipients)
1223 def fetch_activities_bounded(
1225 recipients_with_public,
1227 pagination \\ :keyset
1229 fetch_activities_query([], opts)
1230 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1231 |> Pagination.fetch_paginated(opts, pagination)
1235 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1236 def upload(file, opts \\ []) do
1237 with {:ok, data} <- Upload.store(file, opts) do
1238 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1240 Repo.insert(%Object{data: obj_data})
1244 @spec get_actor_url(any()) :: binary() | nil
1245 defp get_actor_url(url) when is_binary(url), do: url
1246 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1248 defp get_actor_url(url) when is_list(url) do
1254 defp get_actor_url(_url), do: nil
1256 defp object_to_user_data(data) do
1258 data["icon"]["url"] &&
1261 "url" => [%{"href" => data["icon"]["url"]}]
1265 data["image"]["url"] &&
1268 "url" => [%{"href" => data["image"]["url"]}]
1273 |> Map.get("attachment", [])
1274 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1275 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1279 |> Map.get("tag", [])
1281 %{"type" => "Emoji"} -> true
1284 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1285 {String.trim(name, ":"), url}
1288 is_locked = data["manuallyApprovesFollowers"] || false
1289 capabilities = data["capabilities"] || %{}
1290 accepts_chat_messages = capabilities["acceptsChatMessages"]
1291 data = Transmogrifier.maybe_fix_user_object(data)
1292 is_discoverable = data["discoverable"] || false
1293 invisible = data["invisible"] || false
1294 actor_type = data["type"] || "Person"
1297 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1298 data["publicKey"]["publicKeyPem"]
1304 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1305 data["endpoints"]["sharedInbox"]
1312 uri: get_actor_url(data["url"]),
1317 is_locked: is_locked,
1318 is_discoverable: is_discoverable,
1319 invisible: invisible,
1322 follower_address: data["followers"],
1323 following_address: data["following"],
1324 bio: data["summary"] || "",
1325 actor_type: actor_type,
1326 also_known_as: Map.get(data, "alsoKnownAs", []),
1327 public_key: public_key,
1328 inbox: data["inbox"],
1329 shared_inbox: shared_inbox,
1330 accepts_chat_messages: accepts_chat_messages
1333 # nickname can be nil because of virtual actors
1334 if data["preferredUsername"] do
1338 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1341 Map.put(user_data, :nickname, nil)
1345 def fetch_follow_information_for_user(user) do
1346 with {:ok, following_data} <-
1347 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1348 {:ok, hide_follows} <- collection_private(following_data),
1349 {:ok, followers_data} <-
1350 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1351 {:ok, hide_followers} <- collection_private(followers_data) do
1354 hide_follows: hide_follows,
1355 follower_count: normalize_counter(followers_data["totalItems"]),
1356 following_count: normalize_counter(following_data["totalItems"]),
1357 hide_followers: hide_followers
1360 {:error, _} = e -> e
1365 defp normalize_counter(counter) when is_integer(counter), do: counter
1366 defp normalize_counter(_), do: 0
1368 def maybe_update_follow_information(user_data) do
1369 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1370 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1372 {:collections_available,
1373 !!(user_data[:following_address] && user_data[:follower_address])},
1375 fetch_follow_information_for_user(user_data) do
1376 info = Map.merge(user_data[:info] || %{}, info)
1379 |> Map.put(:info, info)
1381 {:user_type_check, false} ->
1384 {:collections_available, false} ->
1387 {:enabled, false} ->
1392 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1399 defp collection_private(%{"first" => %{"type" => type}})
1400 when type in ["CollectionPage", "OrderedCollectionPage"],
1403 defp collection_private(%{"first" => first}) do
1404 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1405 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1408 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1409 {:error, _} = e -> e
1414 defp collection_private(_data), do: {:ok, true}
1416 def user_data_from_user_object(data) do
1417 with {:ok, data} <- MRF.filter(data) do
1418 {:ok, object_to_user_data(data)}
1424 def fetch_and_prepare_user_from_ap_id(ap_id) do
1425 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1426 {:ok, data} <- user_data_from_user_object(data) do
1427 {:ok, maybe_update_follow_information(data)}
1429 # If this has been deleted, only log a debug and not an error
1430 {:error, "Object has been deleted" = e} ->
1431 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1434 {:error, {:reject, reason} = e} ->
1435 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1439 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1444 def maybe_handle_clashing_nickname(data) do
1445 with nickname when is_binary(nickname) <- data[:nickname],
1446 %User{} = old_user <- User.get_by_nickname(nickname),
1447 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1449 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1455 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1456 |> User.update_and_set_cache()
1458 {:ap_id_comparison, true} ->
1460 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1468 def make_user_from_ap_id(ap_id) do
1469 user = User.get_cached_by_ap_id(ap_id)
1471 if user && !User.ap_enabled?(user) do
1472 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1474 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1477 |> User.remote_user_changeset(data)
1478 |> User.update_and_set_cache()
1480 maybe_handle_clashing_nickname(data)
1483 |> User.remote_user_changeset()
1491 def make_user_from_nickname(nickname) do
1492 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1493 make_user_from_ap_id(ap_id)
1495 _e -> {:error, "No AP id in WebFinger"}
1499 # filter out broken threads
1500 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1501 entire_thread_visible_for_user?(activity, user)
1504 # do post-processing on a specific activity
1505 def contain_activity(%Activity{} = activity, %User{} = user) do
1506 contain_broken_threads(activity, user)
1509 def fetch_direct_messages_query do
1511 |> restrict_type(%{type: "Create"})
1512 |> restrict_visibility(%{visibility: "direct"})
1513 |> order_by([activity], asc: activity.id)