1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{deactivated: deactivated} -> not deactivated
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
389 {:error, error} -> Repo.rollback(error)
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
437 "?->>'type' = ? and ?->>'context' = ?",
444 |> exclude_poll_votes(opts)
446 |> order_by([activity], desc: activity.id)
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
452 |> fetch_activities_for_context_query(opts)
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
484 @valid_visibilities ~w[direct unlisted public private]
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
493 "activity_visibility(?, ?, ?) = ANY (?)",
501 Logger.error("Could not restrict visibility to #{visibility}")
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
519 defp restrict_visibility(query, _visibility), do: query
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
528 "activity_visibility(?, ?, ?) = ANY (?)",
536 Logger.error("Could not exclude visibility to #{visibility}")
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
547 "activity_visibility(?, ?, ?) = ?",
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
562 defp exclude_visibility(query, _visibility), do: query
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
577 defp restrict_thread_visibility(query, _, _), do: query
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
586 godmode: params[:godmode],
587 reading_user: reading_user
589 |> user_activities_recipients()
590 |> fetch_activities(params)
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
603 if User.blocks?(reading_user, user) do
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
611 pagination_type = Map.get(params, :pagination_type) || :keyset
614 godmode: params[:godmode],
615 reading_user: reading_user
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
626 godmode: params[:godmode],
627 reading_user: reading_user
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
634 defp user_activities_recipients(%{godmode: true}), do: []
636 defp user_activities_recipients(%{reading_user: reading_user}) do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
640 [Constants.as_public()]
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
650 [activity, object] in query,
653 "?->>'type' != ? or ?->>'actor' != ?",
662 defp restrict_announce_object_actor(query, _), do: query
664 defp restrict_since(query, %{since_id: ""}), do: query
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
670 defp restrict_since(query, _), do: query
672 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
673 raise_on_missing_preload()
676 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
678 [_activity, object] in query,
679 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
683 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
684 restrict_tag_reject(query, %{tag_reject: [tag_reject]})
687 defp restrict_tag_reject(query, _), do: query
689 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
690 raise_on_missing_preload()
693 defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
700 defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
701 restrict_tag(query, %{tag: tag})
704 defp restrict_tag_all(query, _), do: query
706 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
707 raise_on_missing_preload()
710 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
712 [_activity, object] in query,
713 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
717 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
718 restrict_tag(query, %{tag: [tag]})
721 defp restrict_tag(query, _), do: query
723 defp restrict_hashtag(query, opts) do
724 [tag_any, tag_all, tag_reject] =
725 [:tag, :tag_all, :tag_reject]
726 |> Enum.map(&opts[&1])
727 |> Enum.map(&List.wrap(&1))
729 has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
735 opts[:skip_preload] ->
736 raise_on_missing_preload()
740 |> group_by_all_bindings()
741 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
742 |> maybe_restrict_hashtag_any(tag_any)
743 |> maybe_restrict_hashtag_all(tag_all)
744 |> maybe_restrict_hashtag_reject_any(tag_reject)
748 # Groups by all bindings to allow aggregation on hashtags
749 defp group_by_all_bindings(query) do
750 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
752 Enum.count(query.aliases) == 4 ->
753 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
755 Enum.count(query.aliases) == 3 ->
756 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
758 Enum.count(query.aliases) == 2 ->
759 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
762 from([a, o] in query, group_by: [a.id, o.id])
766 defp maybe_restrict_hashtag_any(query, []) do
770 defp maybe_restrict_hashtag_any(query, tags) do
774 fragment("array_agg(?) && (?)", hashtag.name, ^tags)
778 defp maybe_restrict_hashtag_all(query, []) do
782 defp maybe_restrict_hashtag_all(query, tags) do
786 fragment("array_agg(?) @> (?)", hashtag.name, ^tags)
790 defp maybe_restrict_hashtag_reject_any(query, []) do
794 defp maybe_restrict_hashtag_reject_any(query, tags) do
798 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags)
802 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
803 raise_on_missing_preload()
806 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
808 |> group_by_all_bindings()
809 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
812 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
816 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
817 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
820 defp restrict_hashtag_reject_any(query, _), do: query
822 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
823 raise_on_missing_preload()
826 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
830 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
834 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
835 restrict_hashtag_any(query, %{tag: tag})
838 defp restrict_hashtag_all(query, _), do: query
840 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
841 raise_on_missing_preload()
844 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
846 [_activity, object] in query,
847 join: hashtag in assoc(object, :hashtags),
848 where: hashtag.name in ^tags
852 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
853 restrict_hashtag_any(query, %{tag: [tag]})
856 defp restrict_hashtag_any(query, _), do: query
858 defp raise_on_missing_preload do
859 raise "Can't use the child object without preloading!"
862 defp restrict_recipients(query, [], _user), do: query
864 defp restrict_recipients(query, recipients, nil) do
865 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
868 defp restrict_recipients(query, recipients, user) do
871 where: fragment("? && ?", ^recipients, activity.recipients),
872 or_where: activity.actor == ^user.ap_id
876 defp restrict_local(query, %{local_only: true}) do
877 from(activity in query, where: activity.local == true)
880 defp restrict_local(query, _), do: query
882 defp restrict_actor(query, %{actor_id: actor_id}) do
883 from(activity in query, where: activity.actor == ^actor_id)
886 defp restrict_actor(query, _), do: query
888 defp restrict_type(query, %{type: type}) when is_binary(type) do
889 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
892 defp restrict_type(query, %{type: type}) do
893 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
896 defp restrict_type(query, _), do: query
898 defp restrict_state(query, %{state: state}) do
899 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
902 defp restrict_state(query, _), do: query
904 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
906 [_activity, object] in query,
907 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
911 defp restrict_favorited_by(query, _), do: query
913 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
914 raise "Can't use the child object without preloading!"
917 defp restrict_media(query, %{only_media: true}) do
919 [activity, object] in query,
920 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
921 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
925 defp restrict_media(query, _), do: query
927 defp restrict_replies(query, %{exclude_replies: true}) do
929 [_activity, object] in query,
930 where: fragment("?->>'inReplyTo' is null", object.data)
934 defp restrict_replies(query, %{
935 reply_filtering_user: %User{} = user,
936 reply_visibility: "self"
939 [activity, object] in query,
942 "?->>'inReplyTo' is null OR ? = ANY(?)",
950 defp restrict_replies(query, %{
951 reply_filtering_user: %User{} = user,
952 reply_visibility: "following"
955 [activity, object] in query,
959 ?->>'type' != 'Create' -- This isn't a Create
960 OR ?->>'inReplyTo' is null -- this isn't a reply
961 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
962 -- unless they are the author (because authors
963 -- are also part of the recipients). This leads
964 -- to a bug that self-replies by friends won't
966 OR ? = ? -- The actor is us
970 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
979 defp restrict_replies(query, _), do: query
981 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
982 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
985 defp restrict_reblogs(query, _), do: query
987 defp restrict_muted(query, %{with_muted: true}), do: query
989 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
990 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
993 from([activity] in query,
994 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
997 "not (?->'to' \\?| ?) or ? = ?",
1005 unless opts[:skip_preload] do
1006 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1012 defp restrict_muted(query, _), do: query
1014 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1015 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1016 domain_blocks = user.domain_blocks || []
1018 following_ap_ids = User.get_friends_ap_ids(user)
1021 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1024 [activity, object: o] in query,
1025 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1028 "((not (? && ?)) or ? = ?)",
1029 activity.recipients,
1036 "recipients_contain_blocked_domains(?, ?) = false",
1037 activity.recipients,
1042 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1049 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1057 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1066 defp restrict_blocked(query, _), do: query
1068 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1073 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1075 ^[Constants.as_public()]
1080 defp restrict_unlisted(query, _), do: query
1082 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1083 from(activity in query, where: activity.id in ^ids)
1086 defp restrict_pinned(query, _), do: query
1088 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1089 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1095 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1103 defp restrict_muted_reblogs(query, _), do: query
1105 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1108 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1112 defp restrict_instance(query, _), do: query
1114 defp restrict_filtered(query, %{user: %User{} = user}) do
1115 case Filter.compose_regex(user) do
1120 from([activity, object] in query,
1122 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1123 activity.actor == ^user.ap_id
1128 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1129 restrict_filtered(query, %{user: user})
1132 defp restrict_filtered(query, _), do: query
1134 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1136 defp exclude_poll_votes(query, _) do
1137 if has_named_binding?(query, :object) do
1138 from([activity, object: o] in query,
1139 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1146 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1148 defp exclude_chat_messages(query, _) do
1149 if has_named_binding?(query, :object) do
1150 from([activity, object: o] in query,
1151 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1158 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1160 defp exclude_invisible_actors(query, _opts) do
1162 User.Query.build(%{invisible: true, select: [:ap_id]})
1164 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1166 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1169 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1170 from(activity in query, where: activity.id != ^id)
1173 defp exclude_id(query, _), do: query
1175 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1177 defp maybe_preload_objects(query, _) do
1179 |> Activity.with_preloaded_object()
1182 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1184 defp maybe_preload_bookmarks(query, opts) do
1186 |> Activity.with_preloaded_bookmark(opts[:user])
1189 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1191 |> Activity.with_preloaded_report_notes()
1194 defp maybe_preload_report_notes(query, _), do: query
1196 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1198 defp maybe_set_thread_muted_field(query, opts) do
1200 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1203 defp maybe_order(query, %{order: :desc}) do
1205 |> order_by(desc: :id)
1208 defp maybe_order(query, %{order: :asc}) do
1210 |> order_by(asc: :id)
1213 defp maybe_order(query, _), do: query
1215 defp fetch_activities_query_ap_ids_ops(opts) do
1216 source_user = opts[:muting_user]
1217 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1219 ap_id_relationships =
1220 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1221 [:block | ap_id_relationships]
1226 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1228 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1229 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1231 restrict_muted_reblogs_opts =
1232 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1234 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1237 def fetch_activities_query(recipients, opts \\ %{}) do
1238 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1239 fetch_activities_query_ap_ids_ops(opts)
1242 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1247 |> maybe_preload_objects(opts)
1248 |> maybe_preload_bookmarks(opts)
1249 |> maybe_preload_report_notes(opts)
1250 |> maybe_set_thread_muted_field(opts)
1251 |> maybe_order(opts)
1252 |> restrict_recipients(recipients, opts[:user])
1253 |> restrict_replies(opts)
1254 |> restrict_since(opts)
1255 |> restrict_local(opts)
1256 |> restrict_actor(opts)
1257 |> restrict_type(opts)
1258 |> restrict_state(opts)
1259 |> restrict_favorited_by(opts)
1260 |> restrict_blocked(restrict_blocked_opts)
1261 |> restrict_muted(restrict_muted_opts)
1262 |> restrict_filtered(opts)
1263 |> restrict_media(opts)
1264 |> restrict_visibility(opts)
1265 |> restrict_thread_visibility(opts, config)
1266 |> restrict_reblogs(opts)
1267 |> restrict_pinned(opts)
1268 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1269 |> restrict_instance(opts)
1270 |> restrict_announce_object_actor(opts)
1271 |> restrict_filtered(opts)
1272 |> Activity.restrict_deactivated_users()
1273 |> exclude_poll_votes(opts)
1274 |> exclude_chat_messages(opts)
1275 |> exclude_invisible_actors(opts)
1276 |> exclude_visibility(opts)
1279 Config.object_embedded_hashtags?() ->
1281 |> restrict_tag(opts)
1282 |> restrict_tag_reject(opts)
1283 |> restrict_tag_all(opts)
1285 # TODO: benchmark (initial approach preferring non-aggregate ops when possible)
1286 Config.improved_hashtag_timeline() == :join ->
1288 |> distinct([activity], true)
1289 |> restrict_hashtag_any(opts)
1290 |> restrict_hashtag_all(opts)
1291 |> restrict_hashtag_reject_any(opts)
1294 restrict_hashtag(query, opts)
1298 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1299 list_memberships = Pleroma.List.memberships(opts[:user])
1301 fetch_activities_query(recipients ++ list_memberships, opts)
1302 |> Pagination.fetch_paginated(opts, pagination)
1304 |> maybe_update_cc(list_memberships, opts[:user])
1308 Fetch favorites activities of user with order by sort adds to favorites
1310 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1311 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1313 |> Activity.Queries.by_actor()
1314 |> Activity.Queries.by_type("Like")
1315 |> Activity.with_joined_object()
1316 |> Object.with_joined_activity()
1317 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1318 |> order_by([like, _, _], desc_nulls_last: like.id)
1319 |> Pagination.fetch_paginated(
1320 Map.merge(params, %{skip_order: true}),
1325 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1326 Enum.map(activities, fn
1327 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1328 if Enum.any?(bcc, &(&1 in list_memberships)) do
1329 update_in(activity.data["cc"], &[user_ap_id | &1])
1339 defp maybe_update_cc(activities, _, _), do: activities
1341 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1342 from(activity in query,
1344 fragment("? && ?", activity.recipients, ^recipients) or
1345 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1346 ^Constants.as_public() in activity.recipients)
1350 def fetch_activities_bounded(
1352 recipients_with_public,
1354 pagination \\ :keyset
1356 fetch_activities_query([], opts)
1357 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1358 |> Pagination.fetch_paginated(opts, pagination)
1362 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1363 def upload(file, opts \\ []) do
1364 with {:ok, data} <- Upload.store(file, opts) do
1365 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1367 Repo.insert(%Object{data: obj_data})
1371 @spec get_actor_url(any()) :: binary() | nil
1372 defp get_actor_url(url) when is_binary(url), do: url
1373 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1375 defp get_actor_url(url) when is_list(url) do
1381 defp get_actor_url(_url), do: nil
1383 defp object_to_user_data(data) do
1385 data["icon"]["url"] &&
1388 "url" => [%{"href" => data["icon"]["url"]}]
1392 data["image"]["url"] &&
1395 "url" => [%{"href" => data["image"]["url"]}]
1400 |> Map.get("attachment", [])
1401 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1402 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1406 |> Map.get("tag", [])
1408 %{"type" => "Emoji"} -> true
1411 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1412 {String.trim(name, ":"), url}
1415 is_locked = data["manuallyApprovesFollowers"] || false
1416 capabilities = data["capabilities"] || %{}
1417 accepts_chat_messages = capabilities["acceptsChatMessages"]
1418 data = Transmogrifier.maybe_fix_user_object(data)
1419 is_discoverable = data["discoverable"] || false
1420 invisible = data["invisible"] || false
1421 actor_type = data["type"] || "Person"
1424 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1425 data["publicKey"]["publicKeyPem"]
1431 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1432 data["endpoints"]["sharedInbox"]
1439 uri: get_actor_url(data["url"]),
1444 is_locked: is_locked,
1445 is_discoverable: is_discoverable,
1446 invisible: invisible,
1449 follower_address: data["followers"],
1450 following_address: data["following"],
1451 bio: data["summary"] || "",
1452 actor_type: actor_type,
1453 also_known_as: Map.get(data, "alsoKnownAs", []),
1454 public_key: public_key,
1455 inbox: data["inbox"],
1456 shared_inbox: shared_inbox,
1457 accepts_chat_messages: accepts_chat_messages
1460 # nickname can be nil because of virtual actors
1461 if data["preferredUsername"] do
1465 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1468 Map.put(user_data, :nickname, nil)
1472 def fetch_follow_information_for_user(user) do
1473 with {:ok, following_data} <-
1474 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1475 {:ok, hide_follows} <- collection_private(following_data),
1476 {:ok, followers_data} <-
1477 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1478 {:ok, hide_followers} <- collection_private(followers_data) do
1481 hide_follows: hide_follows,
1482 follower_count: normalize_counter(followers_data["totalItems"]),
1483 following_count: normalize_counter(following_data["totalItems"]),
1484 hide_followers: hide_followers
1487 {:error, _} = e -> e
1492 defp normalize_counter(counter) when is_integer(counter), do: counter
1493 defp normalize_counter(_), do: 0
1495 def maybe_update_follow_information(user_data) do
1496 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1497 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1499 {:collections_available,
1500 !!(user_data[:following_address] && user_data[:follower_address])},
1502 fetch_follow_information_for_user(user_data) do
1503 info = Map.merge(user_data[:info] || %{}, info)
1506 |> Map.put(:info, info)
1508 {:user_type_check, false} ->
1511 {:collections_available, false} ->
1514 {:enabled, false} ->
1519 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1526 defp collection_private(%{"first" => %{"type" => type}})
1527 when type in ["CollectionPage", "OrderedCollectionPage"],
1530 defp collection_private(%{"first" => first}) do
1531 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1532 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1535 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1536 {:error, _} = e -> e
1541 defp collection_private(_data), do: {:ok, true}
1543 def user_data_from_user_object(data) do
1544 with {:ok, data} <- MRF.filter(data) do
1545 {:ok, object_to_user_data(data)}
1551 def fetch_and_prepare_user_from_ap_id(ap_id) do
1552 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1553 {:ok, data} <- user_data_from_user_object(data) do
1554 {:ok, maybe_update_follow_information(data)}
1556 # If this has been deleted, only log a debug and not an error
1557 {:error, "Object has been deleted" = e} ->
1558 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1561 {:error, {:reject, reason} = e} ->
1562 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1566 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1571 def maybe_handle_clashing_nickname(data) do
1572 with nickname when is_binary(nickname) <- data[:nickname],
1573 %User{} = old_user <- User.get_by_nickname(nickname),
1574 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1576 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1582 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1583 |> User.update_and_set_cache()
1585 {:ap_id_comparison, true} ->
1587 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1595 def make_user_from_ap_id(ap_id) do
1596 user = User.get_cached_by_ap_id(ap_id)
1598 if user && !User.ap_enabled?(user) do
1599 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1601 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1604 |> User.remote_user_changeset(data)
1605 |> User.update_and_set_cache()
1607 maybe_handle_clashing_nickname(data)
1610 |> User.remote_user_changeset()
1618 def make_user_from_nickname(nickname) do
1619 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1620 make_user_from_ap_id(ap_id)
1622 _e -> {:error, "No AP id in WebFinger"}
1626 # filter out broken threads
1627 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1628 entire_thread_visible_for_user?(activity, user)
1631 # do post-processing on a specific activity
1632 def contain_activity(%Activity{} = activity, %User{} = user) do
1633 contain_broken_threads(activity, user)
1636 def fetch_direct_messages_query do
1638 |> restrict_type(%{type: "Create"})
1639 |> restrict_visibility(%{visibility: "direct"})
1640 |> order_by([activity], asc: activity.id)