1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
54 defp check_actor_is_active(nil), do: true
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
68 defp check_remote_limit(_), do: true
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
87 defp increase_replies_count_if_reply(_create_data), do: :noop
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
102 Repo.insert(%Activity{
105 recipients: recipients,
106 actor: object["actor"]
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
135 %Activity{} = activity ->
141 {:containment, _} = error ->
144 {:error, _} = error ->
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
152 recipients: recipients,
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
167 defp insert_activity_with_expiration(data, local, recipients) do
171 actor: data["actor"],
172 recipients: recipients
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
186 stream_out_participations(participations)
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
211 defp get_participations({:ok, conversation}) do
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
217 defp get_participations(_), do: []
219 def stream_out_participations(participations) do
222 |> Repo.preload(:user)
224 Streamer.stream("participation", participations)
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
243 def stream_out_participations(_, _), do: :noop
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
252 def stream_out(_activity) do
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
272 %{to: to, actor: actor, published: published, context: context, object: object},
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
285 {:quick_insert, true, activity} ->
288 {:fake, true, activity} ->
292 Repo.rollback(message)
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
305 %{to: to, actor: actor, published: published, context: context, object: object},
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
335 {:error, error} -> Repo.rollback(error)
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
359 additional = params[:additional] || %{}
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
365 Map.merge(additional, %{"to" => [], "cc" => []})
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
384 {:error, error} -> Repo.rollback(error)
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
432 "?->>'type' = ? and ?->>'context' = ?",
439 |> exclude_poll_votes(opts)
441 |> order_by([activity], desc: activity.id)
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
447 |> fetch_activities_for_context_query(opts)
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
479 @valid_visibilities ~w[direct unlisted public private]
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
488 "activity_visibility(?, ?, ?) = ANY (?)",
496 Logger.error("Could not restrict visibility to #{visibility}")
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
514 defp restrict_visibility(query, _visibility), do: query
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 "activity_visibility(?, ?, ?) = ANY (?)",
531 Logger.error("Could not exclude visibility to #{visibility}")
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
542 "activity_visibility(?, ?, ?) = ?",
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
557 defp exclude_visibility(query, _visibility), do: query
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
572 defp restrict_thread_visibility(query, _, _), do: query
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
581 godmode: params[:godmode],
582 reading_user: reading_user
584 |> user_activities_recipients()
585 |> fetch_activities(params)
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
598 if User.blocks?(reading_user, user) do
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
607 godmode: params[:godmode],
608 reading_user: reading_user
610 |> user_activities_recipients()
611 |> fetch_activities(params)
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
619 godmode: params[:godmode],
620 reading_user: reading_user
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
627 defp user_activities_recipients(%{godmode: true}), do: []
629 defp user_activities_recipients(%{reading_user: reading_user}) do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
633 [Constants.as_public()]
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
643 [activity, object] in query,
646 "?->>'type' != ? or ?->>'actor' != ?",
655 defp restrict_announce_object_actor(query, _), do: query
657 defp restrict_since(query, %{since_id: ""}), do: query
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
663 defp restrict_since(query, _), do: query
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise_on_missing_preload()
669 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
676 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
677 restrict_tag_reject(query, %{tag_reject: [tag_reject]})
680 defp restrict_tag_reject(query, _), do: query
682 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
683 raise_on_missing_preload()
686 defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
693 defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
694 restrict_tag(query, %{tag: tag})
697 defp restrict_tag_all(query, _), do: query
699 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
700 raise_on_missing_preload()
703 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
710 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
711 restrict_tag(query, %{tag: [tag]})
714 defp restrict_tag(query, _), do: query
716 defp restrict_hashtag(query, opts) do
717 [tag_any, tag_all, tag_reject] =
718 [:tag, :tag_all, :tag_reject]
719 |> Enum.map(&opts[&1])
720 |> Enum.map(&List.wrap(&1))
722 has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
728 opts[:skip_preload] ->
729 raise_on_missing_preload()
733 |> group_by_all_bindings()
734 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
735 |> maybe_restrict_hashtag_any(tag_any)
736 |> maybe_restrict_hashtag_all(tag_all)
737 |> maybe_restrict_hashtag_reject_any(tag_reject)
741 # Groups by all bindings to allow aggregation on hashtags
742 defp group_by_all_bindings(query) do
743 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
745 Enum.count(query.aliases) == 4 ->
746 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
748 Enum.count(query.aliases) == 3 ->
749 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
751 Enum.count(query.aliases) == 2 ->
752 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
755 from([a, o] in query, group_by: [a.id, o.id])
759 defp maybe_restrict_hashtag_any(query, []) do
763 defp maybe_restrict_hashtag_any(query, tags) do
767 fragment("array_agg(?) && (?)", hashtag.name, ^tags)
771 defp maybe_restrict_hashtag_all(query, []) do
775 defp maybe_restrict_hashtag_all(query, tags) do
779 fragment("array_agg(?) @> (?)", hashtag.name, ^tags)
783 defp maybe_restrict_hashtag_reject_any(query, []) do
787 defp maybe_restrict_hashtag_reject_any(query, tags) do
791 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags)
795 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
796 raise_on_missing_preload()
799 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
801 |> group_by_all_bindings()
802 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
805 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
809 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
810 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
813 defp restrict_hashtag_reject_any(query, _), do: query
815 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
816 raise_on_missing_preload()
819 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
823 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
827 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
828 restrict_hashtag_any(query, %{tag: tag})
831 defp restrict_hashtag_all(query, _), do: query
833 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
834 raise_on_missing_preload()
837 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
839 [_activity, object] in query,
840 join: hashtag in assoc(object, :hashtags),
841 where: hashtag.name in ^tags
845 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
846 restrict_hashtag_any(query, %{tag: [tag]})
849 defp restrict_hashtag_any(query, _), do: query
851 defp raise_on_missing_preload do
852 raise "Can't use the child object without preloading!"
855 defp restrict_recipients(query, [], _user), do: query
857 defp restrict_recipients(query, recipients, nil) do
858 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
861 defp restrict_recipients(query, recipients, user) do
864 where: fragment("? && ?", ^recipients, activity.recipients),
865 or_where: activity.actor == ^user.ap_id
869 defp restrict_local(query, %{local_only: true}) do
870 from(activity in query, where: activity.local == true)
873 defp restrict_local(query, _), do: query
875 defp restrict_actor(query, %{actor_id: actor_id}) do
876 from(activity in query, where: activity.actor == ^actor_id)
879 defp restrict_actor(query, _), do: query
881 defp restrict_type(query, %{type: type}) when is_binary(type) do
882 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
885 defp restrict_type(query, %{type: type}) do
886 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
889 defp restrict_type(query, _), do: query
891 defp restrict_state(query, %{state: state}) do
892 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
895 defp restrict_state(query, _), do: query
897 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
899 [_activity, object] in query,
900 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
904 defp restrict_favorited_by(query, _), do: query
906 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
907 raise "Can't use the child object without preloading!"
910 defp restrict_media(query, %{only_media: true}) do
912 [activity, object] in query,
913 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
914 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
918 defp restrict_media(query, _), do: query
920 defp restrict_replies(query, %{exclude_replies: true}) do
922 [_activity, object] in query,
923 where: fragment("?->>'inReplyTo' is null", object.data)
927 defp restrict_replies(query, %{
928 reply_filtering_user: %User{} = user,
929 reply_visibility: "self"
932 [activity, object] in query,
935 "?->>'inReplyTo' is null OR ? = ANY(?)",
943 defp restrict_replies(query, %{
944 reply_filtering_user: %User{} = user,
945 reply_visibility: "following"
948 [activity, object] in query,
952 ?->>'type' != 'Create' -- This isn't a Create
953 OR ?->>'inReplyTo' is null -- this isn't a reply
954 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
955 -- unless they are the author (because authors
956 -- are also part of the recipients). This leads
957 -- to a bug that self-replies by friends won't
959 OR ? = ? -- The actor is us
963 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
972 defp restrict_replies(query, _), do: query
974 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
975 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
978 defp restrict_reblogs(query, _), do: query
980 defp restrict_muted(query, %{with_muted: true}), do: query
982 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
983 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
986 from([activity] in query,
987 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
990 "not (?->'to' \\?| ?) or ? = ?",
998 unless opts[:skip_preload] do
999 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1005 defp restrict_muted(query, _), do: query
1007 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1008 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1009 domain_blocks = user.domain_blocks || []
1011 following_ap_ids = User.get_friends_ap_ids(user)
1014 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1017 [activity, object: o] in query,
1018 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1021 "((not (? && ?)) or ? = ?)",
1022 activity.recipients,
1029 "recipients_contain_blocked_domains(?, ?) = false",
1030 activity.recipients,
1035 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1042 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1050 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1059 defp restrict_blocked(query, _), do: query
1061 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1066 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1068 ^[Constants.as_public()]
1073 defp restrict_unlisted(query, _), do: query
1075 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1076 from(activity in query, where: activity.id in ^ids)
1079 defp restrict_pinned(query, _), do: query
1081 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1082 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1088 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1096 defp restrict_muted_reblogs(query, _), do: query
1098 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1101 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1105 defp restrict_instance(query, _), do: query
1107 defp restrict_filtered(query, %{user: %User{} = user}) do
1108 case Filter.compose_regex(user) do
1113 from([activity, object] in query,
1115 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1116 activity.actor == ^user.ap_id
1121 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1122 restrict_filtered(query, %{user: user})
1125 defp restrict_filtered(query, _), do: query
1127 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1129 defp exclude_poll_votes(query, _) do
1130 if has_named_binding?(query, :object) do
1131 from([activity, object: o] in query,
1132 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1139 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1141 defp exclude_chat_messages(query, _) do
1142 if has_named_binding?(query, :object) do
1143 from([activity, object: o] in query,
1144 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1151 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1153 defp exclude_invisible_actors(query, _opts) do
1155 User.Query.build(%{invisible: true, select: [:ap_id]})
1157 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1159 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1162 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1163 from(activity in query, where: activity.id != ^id)
1166 defp exclude_id(query, _), do: query
1168 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1170 defp maybe_preload_objects(query, _) do
1172 |> Activity.with_preloaded_object()
1175 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1177 defp maybe_preload_bookmarks(query, opts) do
1179 |> Activity.with_preloaded_bookmark(opts[:user])
1182 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1184 |> Activity.with_preloaded_report_notes()
1187 defp maybe_preload_report_notes(query, _), do: query
1189 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1191 defp maybe_set_thread_muted_field(query, opts) do
1193 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1196 defp maybe_order(query, %{order: :desc}) do
1198 |> order_by(desc: :id)
1201 defp maybe_order(query, %{order: :asc}) do
1203 |> order_by(asc: :id)
1206 defp maybe_order(query, _), do: query
1208 defp fetch_activities_query_ap_ids_ops(opts) do
1209 source_user = opts[:muting_user]
1210 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1212 ap_id_relationships =
1213 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1214 [:block | ap_id_relationships]
1219 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1221 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1222 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1224 restrict_muted_reblogs_opts =
1225 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1227 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1230 def fetch_activities_query(recipients, opts \\ %{}) do
1231 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1232 fetch_activities_query_ap_ids_ops(opts)
1235 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1240 |> maybe_preload_objects(opts)
1241 |> maybe_preload_bookmarks(opts)
1242 |> maybe_preload_report_notes(opts)
1243 |> maybe_set_thread_muted_field(opts)
1244 |> maybe_order(opts)
1245 |> restrict_recipients(recipients, opts[:user])
1246 |> restrict_replies(opts)
1247 |> restrict_since(opts)
1248 |> restrict_local(opts)
1249 |> restrict_actor(opts)
1250 |> restrict_type(opts)
1251 |> restrict_state(opts)
1252 |> restrict_favorited_by(opts)
1253 |> restrict_blocked(restrict_blocked_opts)
1254 |> restrict_muted(restrict_muted_opts)
1255 |> restrict_filtered(opts)
1256 |> restrict_media(opts)
1257 |> restrict_visibility(opts)
1258 |> restrict_thread_visibility(opts, config)
1259 |> restrict_reblogs(opts)
1260 |> restrict_pinned(opts)
1261 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1262 |> restrict_instance(opts)
1263 |> restrict_announce_object_actor(opts)
1264 |> restrict_filtered(opts)
1265 |> Activity.restrict_deactivated_users()
1266 |> exclude_poll_votes(opts)
1267 |> exclude_chat_messages(opts)
1268 |> exclude_invisible_actors(opts)
1269 |> exclude_visibility(opts)
1272 Config.object_embedded_hashtags?() ->
1274 |> restrict_tag(opts)
1275 |> restrict_tag_reject(opts)
1276 |> restrict_tag_all(opts)
1278 # TODO: benchmark (initial approach preferring non-aggregate ops when possible)
1279 Config.improved_hashtag_timeline() == :join ->
1281 |> distinct([activity], true)
1282 |> restrict_hashtag_any(opts)
1283 |> restrict_hashtag_all(opts)
1284 |> restrict_hashtag_reject_any(opts)
1287 restrict_hashtag(query, opts)
1291 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1292 list_memberships = Pleroma.List.memberships(opts[:user])
1294 fetch_activities_query(recipients ++ list_memberships, opts)
1295 |> Pagination.fetch_paginated(opts, pagination)
1297 |> maybe_update_cc(list_memberships, opts[:user])
1301 Fetch favorites activities of user with order by sort adds to favorites
1303 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1304 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1306 |> Activity.Queries.by_actor()
1307 |> Activity.Queries.by_type("Like")
1308 |> Activity.with_joined_object()
1309 |> Object.with_joined_activity()
1310 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1311 |> order_by([like, _, _], desc_nulls_last: like.id)
1312 |> Pagination.fetch_paginated(
1313 Map.merge(params, %{skip_order: true}),
1318 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1319 Enum.map(activities, fn
1320 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1321 if Enum.any?(bcc, &(&1 in list_memberships)) do
1322 update_in(activity.data["cc"], &[user_ap_id | &1])
1332 defp maybe_update_cc(activities, _, _), do: activities
1334 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1335 from(activity in query,
1337 fragment("? && ?", activity.recipients, ^recipients) or
1338 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1339 ^Constants.as_public() in activity.recipients)
1343 def fetch_activities_bounded(
1345 recipients_with_public,
1347 pagination \\ :keyset
1349 fetch_activities_query([], opts)
1350 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1351 |> Pagination.fetch_paginated(opts, pagination)
1355 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1356 def upload(file, opts \\ []) do
1357 with {:ok, data} <- Upload.store(file, opts) do
1358 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1360 Repo.insert(%Object{data: obj_data})
1364 @spec get_actor_url(any()) :: binary() | nil
1365 defp get_actor_url(url) when is_binary(url), do: url
1366 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1368 defp get_actor_url(url) when is_list(url) do
1374 defp get_actor_url(_url), do: nil
1376 defp object_to_user_data(data) do
1378 data["icon"]["url"] &&
1381 "url" => [%{"href" => data["icon"]["url"]}]
1385 data["image"]["url"] &&
1388 "url" => [%{"href" => data["image"]["url"]}]
1393 |> Map.get("attachment", [])
1394 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1395 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1399 |> Map.get("tag", [])
1401 %{"type" => "Emoji"} -> true
1404 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1405 {String.trim(name, ":"), url}
1408 is_locked = data["manuallyApprovesFollowers"] || false
1409 capabilities = data["capabilities"] || %{}
1410 accepts_chat_messages = capabilities["acceptsChatMessages"]
1411 data = Transmogrifier.maybe_fix_user_object(data)
1412 is_discoverable = data["discoverable"] || false
1413 invisible = data["invisible"] || false
1414 actor_type = data["type"] || "Person"
1417 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1418 data["publicKey"]["publicKeyPem"]
1424 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1425 data["endpoints"]["sharedInbox"]
1432 uri: get_actor_url(data["url"]),
1437 is_locked: is_locked,
1438 is_discoverable: is_discoverable,
1439 invisible: invisible,
1442 follower_address: data["followers"],
1443 following_address: data["following"],
1444 bio: data["summary"] || "",
1445 actor_type: actor_type,
1446 also_known_as: Map.get(data, "alsoKnownAs", []),
1447 public_key: public_key,
1448 inbox: data["inbox"],
1449 shared_inbox: shared_inbox,
1450 accepts_chat_messages: accepts_chat_messages
1453 # nickname can be nil because of virtual actors
1454 if data["preferredUsername"] do
1458 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1461 Map.put(user_data, :nickname, nil)
1465 def fetch_follow_information_for_user(user) do
1466 with {:ok, following_data} <-
1467 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1468 {:ok, hide_follows} <- collection_private(following_data),
1469 {:ok, followers_data} <-
1470 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1471 {:ok, hide_followers} <- collection_private(followers_data) do
1474 hide_follows: hide_follows,
1475 follower_count: normalize_counter(followers_data["totalItems"]),
1476 following_count: normalize_counter(following_data["totalItems"]),
1477 hide_followers: hide_followers
1480 {:error, _} = e -> e
1485 defp normalize_counter(counter) when is_integer(counter), do: counter
1486 defp normalize_counter(_), do: 0
1488 def maybe_update_follow_information(user_data) do
1489 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1490 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1492 {:collections_available,
1493 !!(user_data[:following_address] && user_data[:follower_address])},
1495 fetch_follow_information_for_user(user_data) do
1496 info = Map.merge(user_data[:info] || %{}, info)
1499 |> Map.put(:info, info)
1501 {:user_type_check, false} ->
1504 {:collections_available, false} ->
1507 {:enabled, false} ->
1512 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1519 defp collection_private(%{"first" => %{"type" => type}})
1520 when type in ["CollectionPage", "OrderedCollectionPage"],
1523 defp collection_private(%{"first" => first}) do
1524 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1525 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1528 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1529 {:error, _} = e -> e
1534 defp collection_private(_data), do: {:ok, true}
1536 def user_data_from_user_object(data) do
1537 with {:ok, data} <- MRF.filter(data) do
1538 {:ok, object_to_user_data(data)}
1544 def fetch_and_prepare_user_from_ap_id(ap_id) do
1545 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1546 {:ok, data} <- user_data_from_user_object(data) do
1547 {:ok, maybe_update_follow_information(data)}
1549 # If this has been deleted, only log a debug and not an error
1550 {:error, "Object has been deleted" = e} ->
1551 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1554 {:error, {:reject, reason} = e} ->
1555 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1559 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1564 def maybe_handle_clashing_nickname(data) do
1565 with nickname when is_binary(nickname) <- data[:nickname],
1566 %User{} = old_user <- User.get_by_nickname(nickname),
1567 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1569 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1575 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1576 |> User.update_and_set_cache()
1578 {:ap_id_comparison, true} ->
1580 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1588 def make_user_from_ap_id(ap_id) do
1589 user = User.get_cached_by_ap_id(ap_id)
1591 if user && !User.ap_enabled?(user) do
1592 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1594 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1597 |> User.remote_user_changeset(data)
1598 |> User.update_and_set_cache()
1600 maybe_handle_clashing_nickname(data)
1603 |> User.remote_user_changeset()
1611 def make_user_from_nickname(nickname) do
1612 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1613 make_user_from_ap_id(ap_id)
1615 _e -> {:error, "No AP id in WebFinger"}
1619 # filter out broken threads
1620 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1621 entire_thread_visible_for_user?(activity, user)
1624 # do post-processing on a specific activity
1625 def contain_activity(%Activity{} = activity, %User{} = user) do
1626 contain_broken_threads(activity, user)
1629 def fetch_direct_messages_query do
1631 |> restrict_type(%{type: "Create"})
1632 |> restrict_visibility(%{visibility: "direct"})
1633 |> order_by([activity], asc: activity.id)