1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
390 {:error, error} -> Repo.rollback(error)
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id
403 with true <- origin.ap_id in target.also_known_as,
404 {:ok, activity} <- insert(params, local),
405 _ <- notify_and_stream(activity) do
406 maybe_federate(activity)
408 BackgroundWorker.enqueue("move_following", %{
409 "origin_id" => origin.id,
410 "target_id" => target.id
415 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
420 def fetch_activities_for_context_query(context, opts) do
421 public = [Constants.as_public()]
425 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
428 from(activity in Activity)
429 |> maybe_preload_objects(opts)
430 |> maybe_preload_bookmarks(opts)
431 |> maybe_set_thread_muted_field(opts)
432 |> restrict_blocked(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
438 "?->>'type' = ? and ?->>'context' = ?",
445 |> exclude_poll_votes(opts)
447 |> order_by([activity], desc: activity.id)
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
453 |> fetch_activities_for_context_query(opts)
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
485 @valid_visibilities ~w[direct unlisted public private]
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
494 "activity_visibility(?, ?, ?) = ANY (?)",
502 Logger.error("Could not restrict visibility to #{visibility}")
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
520 defp restrict_visibility(query, _visibility), do: query
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
529 "activity_visibility(?, ?, ?) = ANY (?)",
537 Logger.error("Could not exclude visibility to #{visibility}")
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
548 "activity_visibility(?, ?, ?) = ?",
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
563 defp exclude_visibility(query, _visibility), do: query
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
578 defp restrict_thread_visibility(query, _, _), do: query
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
587 godmode: params[:godmode],
588 reading_user: reading_user
590 |> user_activities_recipients()
591 |> fetch_activities(params)
595 def fetch_user_activities(user, reading_user, params \\ %{})
597 def fetch_user_activities(user, reading_user, %{total: true} = params) do
598 result = fetch_activities_for_user(user, reading_user, params)
600 Keyword.put(result, :items, Enum.reverse(result[:items]))
603 def fetch_user_activities(user, reading_user, params) do
605 |> fetch_activities_for_user(reading_user, params)
609 defp fetch_activities_for_user(user, reading_user, params) do
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
618 if User.blocks?(reading_user, user) do
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
626 pagination_type = Map.get(params, :pagination_type) || :keyset
629 godmode: params[:godmode],
630 reading_user: reading_user
632 |> user_activities_recipients()
633 |> fetch_activities(params, pagination_type)
636 def fetch_statuses(reading_user, %{total: true} = params) do
637 result = fetch_activities_for_reading_user(reading_user, params)
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
641 def fetch_statuses(reading_user, params) do
643 |> fetch_activities_for_reading_user(params)
647 defp fetch_activities_for_reading_user(reading_user, params) do
648 params = Map.put(params, :type, ["Create", "Announce"])
651 godmode: params[:godmode],
652 reading_user: reading_user
654 |> user_activities_recipients()
655 |> fetch_activities(params, :offset)
658 defp user_activities_recipients(%{godmode: true}), do: []
660 defp user_activities_recipients(%{reading_user: reading_user}) do
662 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
664 [Constants.as_public()]
668 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
669 raise "Can't use the child object without preloading!"
672 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
674 [activity, object] in query,
677 "?->>'type' != ? or ?->>'actor' != ?",
686 defp restrict_announce_object_actor(query, _), do: query
688 defp restrict_since(query, %{since_id: ""}), do: query
690 defp restrict_since(query, %{since_id: since_id}) do
691 from(activity in query, where: activity.id > ^since_id)
694 defp restrict_since(query, _), do: query
696 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
697 raise_on_missing_preload()
700 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
701 tag_all = Enum.map(tag_all, &String.downcase/1)
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
709 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
710 restrict_embedded_tag_any(query, %{tag: tag})
713 defp restrict_embedded_tag_all(query, _), do: query
715 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
716 raise_on_missing_preload()
719 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
720 tag_any = Enum.map(tag_any, &String.downcase/1)
723 [_activity, object] in query,
724 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
728 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
729 restrict_embedded_tag_any(query, %{tag: [tag]})
732 defp restrict_embedded_tag_any(query, _), do: query
734 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
735 raise_on_missing_preload()
738 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
739 tag_reject = Enum.map(tag_reject, &String.downcase/1)
742 [_activity, object] in query,
743 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
747 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
748 when is_binary(tag_reject) do
749 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
752 defp restrict_embedded_tag_reject_any(query, _), do: query
754 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
755 raise_on_missing_preload()
758 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
759 restrict_hashtag_any(query, %{tag: single_tag})
762 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
764 [_activity, object] in query,
768 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
769 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
770 AND hashtags_objects.object_id = ?) @> ?
779 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
780 restrict_hashtag_all(query, %{tag_all: [tag]})
783 defp restrict_hashtag_all(query, _), do: query
785 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
786 raise_on_missing_preload()
789 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
790 # TODO: refactor: debug / experimental feature
791 if Config.get([:database, :improved_hashtag_timeline]) == :preselect_hashtag_ids do
793 from(ht in Pleroma.Hashtag,
794 where: fragment("name = ANY(?::citext[])", ^tags),
800 [_activity, object] in query,
805 SELECT 1 FROM hashtags_objects WHERE hashtag_id = ANY(?) AND object_id = ? LIMIT 1)
813 [_activity, object] in query,
817 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
818 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
819 AND hashtags_objects.object_id = ? LIMIT 1)
828 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
829 restrict_hashtag_any(query, %{tag: [tag]})
832 defp restrict_hashtag_any(query, _), do: query
834 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
835 raise_on_missing_preload()
838 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
840 [_activity, object] in query,
844 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
845 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
846 AND hashtags_objects.object_id = ? LIMIT 1)
854 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
855 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
858 defp restrict_hashtag_reject_any(query, _), do: query
860 defp raise_on_missing_preload do
861 raise "Can't use the child object without preloading!"
864 defp restrict_recipients(query, [], _user), do: query
866 defp restrict_recipients(query, recipients, nil) do
867 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
870 defp restrict_recipients(query, recipients, user) do
873 where: fragment("? && ?", ^recipients, activity.recipients),
874 or_where: activity.actor == ^user.ap_id
878 defp restrict_local(query, %{local_only: true}) do
879 from(activity in query, where: activity.local == true)
882 defp restrict_local(query, _), do: query
884 defp restrict_remote(query, %{remote: true}) do
885 from(activity in query, where: activity.local == false)
888 defp restrict_remote(query, _), do: query
890 defp restrict_actor(query, %{actor_id: actor_id}) do
891 from(activity in query, where: activity.actor == ^actor_id)
894 defp restrict_actor(query, _), do: query
896 defp restrict_type(query, %{type: type}) when is_binary(type) do
897 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
900 defp restrict_type(query, %{type: type}) do
901 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
904 defp restrict_type(query, _), do: query
906 defp restrict_state(query, %{state: state}) do
907 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
910 defp restrict_state(query, _), do: query
912 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
914 [_activity, object] in query,
915 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
919 defp restrict_favorited_by(query, _), do: query
921 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
922 raise "Can't use the child object without preloading!"
925 defp restrict_media(query, %{only_media: true}) do
927 [activity, object] in query,
928 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
929 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
933 defp restrict_media(query, _), do: query
935 defp restrict_replies(query, %{exclude_replies: true}) do
937 [_activity, object] in query,
938 where: fragment("?->>'inReplyTo' is null", object.data)
942 defp restrict_replies(query, %{
943 reply_filtering_user: %User{} = user,
944 reply_visibility: "self"
947 [activity, object] in query,
950 "?->>'inReplyTo' is null OR ? = ANY(?)",
958 defp restrict_replies(query, %{
959 reply_filtering_user: %User{} = user,
960 reply_visibility: "following"
963 [activity, object] in query,
967 ?->>'type' != 'Create' -- This isn't a Create
968 OR ?->>'inReplyTo' is null -- this isn't a reply
969 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
970 -- unless they are the author (because authors
971 -- are also part of the recipients). This leads
972 -- to a bug that self-replies by friends won't
974 OR ? = ? -- The actor is us
978 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
987 defp restrict_replies(query, _), do: query
989 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
990 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
993 defp restrict_reblogs(query, _), do: query
995 defp restrict_muted(query, %{with_muted: true}), do: query
997 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
998 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1001 from([activity] in query,
1002 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1005 "not (?->'to' \\?| ?) or ? = ?",
1013 unless opts[:skip_preload] do
1014 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1020 defp restrict_muted(query, _), do: query
1022 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1023 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1024 domain_blocks = user.domain_blocks || []
1026 following_ap_ids = User.get_friends_ap_ids(user)
1029 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1032 [activity, object: o] in query,
1033 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1036 "((not (? && ?)) or ? = ?)",
1037 activity.recipients,
1044 "recipients_contain_blocked_domains(?, ?) = false",
1045 activity.recipients,
1050 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1057 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1065 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1074 defp restrict_blocked(query, _), do: query
1076 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1081 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1083 ^[Constants.as_public()]
1088 defp restrict_unlisted(query, _), do: query
1090 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1091 from(activity in query, where: activity.id in ^ids)
1094 defp restrict_pinned(query, _), do: query
1096 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1097 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1103 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1111 defp restrict_muted_reblogs(query, _), do: query
1113 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1116 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1120 defp restrict_instance(query, _), do: query
1122 defp restrict_filtered(query, %{user: %User{} = user}) do
1123 case Filter.compose_regex(user) do
1128 from([activity, object] in query,
1130 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1131 activity.actor == ^user.ap_id
1136 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1137 restrict_filtered(query, %{user: user})
1140 defp restrict_filtered(query, _), do: query
1142 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1144 defp exclude_poll_votes(query, _) do
1145 if has_named_binding?(query, :object) do
1146 from([activity, object: o] in query,
1147 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1154 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1156 defp exclude_chat_messages(query, _) do
1157 if has_named_binding?(query, :object) do
1158 from([activity, object: o] in query,
1159 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1166 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1168 defp exclude_invisible_actors(query, _opts) do
1170 User.Query.build(%{invisible: true, select: [:ap_id]})
1172 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1174 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1177 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1178 from(activity in query, where: activity.id != ^id)
1181 defp exclude_id(query, _), do: query
1183 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1185 defp maybe_preload_objects(query, _) do
1187 |> Activity.with_preloaded_object()
1190 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1192 defp maybe_preload_bookmarks(query, opts) do
1194 |> Activity.with_preloaded_bookmark(opts[:user])
1197 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1199 |> Activity.with_preloaded_report_notes()
1202 defp maybe_preload_report_notes(query, _), do: query
1204 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1206 defp maybe_set_thread_muted_field(query, opts) do
1208 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1211 defp maybe_order(query, %{order: :desc}) do
1213 |> order_by(desc: :id)
1216 defp maybe_order(query, %{order: :asc}) do
1218 |> order_by(asc: :id)
1221 defp maybe_order(query, _), do: query
1223 defp fetch_activities_query_ap_ids_ops(opts) do
1224 source_user = opts[:muting_user]
1225 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1227 ap_id_relationships =
1228 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1229 [:block | ap_id_relationships]
1234 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1236 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1237 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1239 restrict_muted_reblogs_opts =
1240 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1242 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1245 def fetch_activities_query(recipients, opts \\ %{}) do
1246 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1247 fetch_activities_query_ap_ids_ops(opts)
1250 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1255 |> maybe_preload_objects(opts)
1256 |> maybe_preload_bookmarks(opts)
1257 |> maybe_preload_report_notes(opts)
1258 |> maybe_set_thread_muted_field(opts)
1259 |> maybe_order(opts)
1260 |> restrict_recipients(recipients, opts[:user])
1261 |> restrict_replies(opts)
1262 |> restrict_since(opts)
1263 |> restrict_local(opts)
1264 |> restrict_remote(opts)
1265 |> restrict_actor(opts)
1266 |> restrict_type(opts)
1267 |> restrict_state(opts)
1268 |> restrict_favorited_by(opts)
1269 |> restrict_blocked(restrict_blocked_opts)
1270 |> restrict_muted(restrict_muted_opts)
1271 |> restrict_filtered(opts)
1272 |> restrict_media(opts)
1273 |> restrict_visibility(opts)
1274 |> restrict_thread_visibility(opts, config)
1275 |> restrict_reblogs(opts)
1276 |> restrict_pinned(opts)
1277 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1278 |> restrict_instance(opts)
1279 |> restrict_announce_object_actor(opts)
1280 |> restrict_filtered(opts)
1281 |> Activity.restrict_deactivated_users()
1282 |> exclude_poll_votes(opts)
1283 |> exclude_chat_messages(opts)
1284 |> exclude_invisible_actors(opts)
1285 |> exclude_visibility(opts)
1287 if Config.get([:database, :improved_hashtag_timeline]) do
1289 |> restrict_hashtag_any(opts)
1290 |> restrict_hashtag_all(opts)
1291 |> restrict_hashtag_reject_any(opts)
1294 |> restrict_embedded_tag_any(opts)
1295 |> restrict_embedded_tag_all(opts)
1296 |> restrict_embedded_tag_reject_any(opts)
1300 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1301 list_memberships = Pleroma.List.memberships(opts[:user])
1303 fetch_activities_query(recipients ++ list_memberships, opts)
1304 |> Pagination.fetch_paginated(opts, pagination)
1306 |> maybe_update_cc(list_memberships, opts[:user])
1310 Fetch favorites activities of user with order by sort adds to favorites
1312 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1313 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1315 |> Activity.Queries.by_actor()
1316 |> Activity.Queries.by_type("Like")
1317 |> Activity.with_joined_object()
1318 |> Object.with_joined_activity()
1319 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1320 |> order_by([like, _, _], desc_nulls_last: like.id)
1321 |> Pagination.fetch_paginated(
1322 Map.merge(params, %{skip_order: true}),
1327 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1328 Enum.map(activities, fn
1329 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1330 if Enum.any?(bcc, &(&1 in list_memberships)) do
1331 update_in(activity.data["cc"], &[user_ap_id | &1])
1341 defp maybe_update_cc(activities, _, _), do: activities
1343 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1344 from(activity in query,
1346 fragment("? && ?", activity.recipients, ^recipients) or
1347 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1348 ^Constants.as_public() in activity.recipients)
1352 def fetch_activities_bounded(
1354 recipients_with_public,
1356 pagination \\ :keyset
1358 fetch_activities_query([], opts)
1359 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1360 |> Pagination.fetch_paginated(opts, pagination)
1364 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1365 def upload(file, opts \\ []) do
1366 with {:ok, data} <- Upload.store(file, opts) do
1367 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1369 Repo.insert(%Object{data: obj_data})
1373 @spec get_actor_url(any()) :: binary() | nil
1374 defp get_actor_url(url) when is_binary(url), do: url
1375 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1377 defp get_actor_url(url) when is_list(url) do
1383 defp get_actor_url(_url), do: nil
1385 defp object_to_user_data(data) do
1387 data["icon"]["url"] &&
1390 "url" => [%{"href" => data["icon"]["url"]}]
1394 data["image"]["url"] &&
1397 "url" => [%{"href" => data["image"]["url"]}]
1402 |> Map.get("attachment", [])
1403 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1404 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1408 |> Map.get("tag", [])
1410 %{"type" => "Emoji"} -> true
1413 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1414 {String.trim(name, ":"), url}
1417 is_locked = data["manuallyApprovesFollowers"] || false
1418 capabilities = data["capabilities"] || %{}
1419 accepts_chat_messages = capabilities["acceptsChatMessages"]
1420 data = Transmogrifier.maybe_fix_user_object(data)
1421 is_discoverable = data["discoverable"] || false
1422 invisible = data["invisible"] || false
1423 actor_type = data["type"] || "Person"
1426 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1427 data["publicKey"]["publicKeyPem"]
1433 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1434 data["endpoints"]["sharedInbox"]
1441 uri: get_actor_url(data["url"]),
1446 is_locked: is_locked,
1447 is_discoverable: is_discoverable,
1448 invisible: invisible,
1451 follower_address: data["followers"],
1452 following_address: data["following"],
1453 bio: data["summary"] || "",
1454 actor_type: actor_type,
1455 also_known_as: Map.get(data, "alsoKnownAs", []),
1456 public_key: public_key,
1457 inbox: data["inbox"],
1458 shared_inbox: shared_inbox,
1459 accepts_chat_messages: accepts_chat_messages
1462 # nickname can be nil because of virtual actors
1463 if data["preferredUsername"] do
1467 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1470 Map.put(user_data, :nickname, nil)
1474 def fetch_follow_information_for_user(user) do
1475 with {:ok, following_data} <-
1476 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1477 {:ok, hide_follows} <- collection_private(following_data),
1478 {:ok, followers_data} <-
1479 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1480 {:ok, hide_followers} <- collection_private(followers_data) do
1483 hide_follows: hide_follows,
1484 follower_count: normalize_counter(followers_data["totalItems"]),
1485 following_count: normalize_counter(following_data["totalItems"]),
1486 hide_followers: hide_followers
1489 {:error, _} = e -> e
1494 defp normalize_counter(counter) when is_integer(counter), do: counter
1495 defp normalize_counter(_), do: 0
1497 def maybe_update_follow_information(user_data) do
1498 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1499 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1501 {:collections_available,
1502 !!(user_data[:following_address] && user_data[:follower_address])},
1504 fetch_follow_information_for_user(user_data) do
1505 info = Map.merge(user_data[:info] || %{}, info)
1508 |> Map.put(:info, info)
1510 {:user_type_check, false} ->
1513 {:collections_available, false} ->
1516 {:enabled, false} ->
1521 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1528 defp collection_private(%{"first" => %{"type" => type}})
1529 when type in ["CollectionPage", "OrderedCollectionPage"],
1532 defp collection_private(%{"first" => first}) do
1533 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1534 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1537 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1538 {:error, _} = e -> e
1543 defp collection_private(_data), do: {:ok, true}
1545 def user_data_from_user_object(data) do
1546 with {:ok, data} <- MRF.filter(data) do
1547 {:ok, object_to_user_data(data)}
1553 def fetch_and_prepare_user_from_ap_id(ap_id) do
1554 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1555 {:ok, data} <- user_data_from_user_object(data) do
1556 {:ok, maybe_update_follow_information(data)}
1558 # If this has been deleted, only log a debug and not an error
1559 {:error, "Object has been deleted" = e} ->
1560 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1563 {:error, {:reject, reason} = e} ->
1564 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1568 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1573 def maybe_handle_clashing_nickname(data) do
1574 with nickname when is_binary(nickname) <- data[:nickname],
1575 %User{} = old_user <- User.get_by_nickname(nickname),
1576 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1578 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1584 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1585 |> User.update_and_set_cache()
1587 {:ap_id_comparison, true} ->
1589 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1597 def make_user_from_ap_id(ap_id) do
1598 user = User.get_cached_by_ap_id(ap_id)
1600 if user && !User.ap_enabled?(user) do
1601 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1603 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1606 |> User.remote_user_changeset(data)
1607 |> User.update_and_set_cache()
1609 maybe_handle_clashing_nickname(data)
1612 |> User.remote_user_changeset()
1620 def make_user_from_nickname(nickname) do
1621 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1622 make_user_from_ap_id(ap_id)
1624 _e -> {:error, "No AP id in WebFinger"}
1628 # filter out broken threads
1629 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1630 entire_thread_visible_for_user?(activity, user)
1633 # do post-processing on a specific activity
1634 def contain_activity(%Activity{} = activity, %User{} = user) do
1635 contain_broken_threads(activity, user)
1638 def fetch_direct_messages_query do
1640 |> restrict_type(%{type: "Create"})
1641 |> restrict_visibility(%{visibility: "direct"})
1642 |> order_by([activity], asc: activity.id)