1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
389 {:error, error} -> Repo.rollback(error)
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
437 "?->>'type' = ? and ?->>'context' = ?",
444 |> exclude_poll_votes(opts)
446 |> order_by([activity], desc: activity.id)
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
452 |> fetch_activities_for_context_query(opts)
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
484 @valid_visibilities ~w[direct unlisted public private]
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
493 "activity_visibility(?, ?, ?) = ANY (?)",
501 Logger.error("Could not restrict visibility to #{visibility}")
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
519 defp restrict_visibility(query, _visibility), do: query
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
528 "activity_visibility(?, ?, ?) = ANY (?)",
536 Logger.error("Could not exclude visibility to #{visibility}")
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
547 "activity_visibility(?, ?, ?) = ?",
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
562 defp exclude_visibility(query, _visibility), do: query
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
577 defp restrict_thread_visibility(query, _, _), do: query
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
586 godmode: params[:godmode],
587 reading_user: reading_user
589 |> user_activities_recipients()
590 |> fetch_activities(params)
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
603 if User.blocks?(reading_user, user) do
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
611 pagination_type = Map.get(params, :pagination_type) || :keyset
614 godmode: params[:godmode],
615 reading_user: reading_user
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
626 godmode: params[:godmode],
627 reading_user: reading_user
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
634 defp user_activities_recipients(%{godmode: true}), do: []
636 defp user_activities_recipients(%{reading_user: reading_user}) do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
640 [Constants.as_public()]
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
650 [activity, object] in query,
653 "?->>'type' != ? or ?->>'actor' != ?",
662 defp restrict_announce_object_actor(query, _), do: query
664 defp restrict_since(query, %{since_id: ""}), do: query
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
670 defp restrict_since(query, _), do: query
672 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
673 raise_on_missing_preload()
676 defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
678 [_activity, object] in query,
679 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
683 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
684 restrict_embedded_tag_any(query, %{tag: tag})
687 defp restrict_embedded_tag_all(query, _), do: query
689 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
690 raise_on_missing_preload()
693 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_list(tag) do
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
700 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
701 restrict_embedded_tag_any(query, %{tag: [tag]})
704 defp restrict_embedded_tag_any(query, _), do: query
706 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
707 raise_on_missing_preload()
710 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
711 when is_list(tag_reject) do
713 [_activity, object] in query,
714 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
718 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
719 when is_binary(tag_reject) do
720 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
723 defp restrict_embedded_tag_reject_any(query, _), do: query
725 # Groups by all bindings to allow aggregation on hashtags
726 defp group_by_all_bindings(query) do
727 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
729 Enum.count(query.aliases) == 4 ->
730 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
732 Enum.count(query.aliases) == 3 ->
733 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
735 Enum.count(query.aliases) == 2 ->
736 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
739 from([a, o] in query, group_by: [a.id, o.id])
743 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
744 raise_on_missing_preload()
747 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
749 |> group_by_all_bindings()
750 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
753 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
757 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
758 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
761 defp restrict_hashtag_reject_any(query, _), do: query
763 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
764 raise_on_missing_preload()
767 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
771 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
775 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
776 restrict_hashtag_any(query, %{tag: tag})
779 defp restrict_hashtag_all(query, _), do: query
781 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
782 raise_on_missing_preload()
785 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
788 [_activity, object] in query,
789 join: hashtag in assoc(object, :hashtags),
790 where: hashtag.name in ^tags
793 if length(tags) > 1 do
794 distinct(query, [activity], true)
800 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
801 restrict_hashtag_any(query, %{tag: [tag]})
804 defp restrict_hashtag_any(query, _), do: query
806 defp raise_on_missing_preload do
807 raise "Can't use the child object without preloading!"
810 defp restrict_recipients(query, [], _user), do: query
812 defp restrict_recipients(query, recipients, nil) do
813 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
816 defp restrict_recipients(query, recipients, user) do
819 where: fragment("? && ?", ^recipients, activity.recipients),
820 or_where: activity.actor == ^user.ap_id
824 defp restrict_local(query, %{local_only: true}) do
825 from(activity in query, where: activity.local == true)
828 defp restrict_local(query, _), do: query
830 defp restrict_actor(query, %{actor_id: actor_id}) do
831 from(activity in query, where: activity.actor == ^actor_id)
834 defp restrict_actor(query, _), do: query
836 defp restrict_type(query, %{type: type}) when is_binary(type) do
837 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
840 defp restrict_type(query, %{type: type}) do
841 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
844 defp restrict_type(query, _), do: query
846 defp restrict_state(query, %{state: state}) do
847 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
850 defp restrict_state(query, _), do: query
852 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
854 [_activity, object] in query,
855 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
859 defp restrict_favorited_by(query, _), do: query
861 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
862 raise "Can't use the child object without preloading!"
865 defp restrict_media(query, %{only_media: true}) do
867 [activity, object] in query,
868 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
869 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
873 defp restrict_media(query, _), do: query
875 defp restrict_replies(query, %{exclude_replies: true}) do
877 [_activity, object] in query,
878 where: fragment("?->>'inReplyTo' is null", object.data)
882 defp restrict_replies(query, %{
883 reply_filtering_user: %User{} = user,
884 reply_visibility: "self"
887 [activity, object] in query,
890 "?->>'inReplyTo' is null OR ? = ANY(?)",
898 defp restrict_replies(query, %{
899 reply_filtering_user: %User{} = user,
900 reply_visibility: "following"
903 [activity, object] in query,
907 ?->>'type' != 'Create' -- This isn't a Create
908 OR ?->>'inReplyTo' is null -- this isn't a reply
909 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
910 -- unless they are the author (because authors
911 -- are also part of the recipients). This leads
912 -- to a bug that self-replies by friends won't
914 OR ? = ? -- The actor is us
918 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
927 defp restrict_replies(query, _), do: query
929 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
930 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
933 defp restrict_reblogs(query, _), do: query
935 defp restrict_muted(query, %{with_muted: true}), do: query
937 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
938 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
941 from([activity] in query,
942 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
945 "not (?->'to' \\?| ?) or ? = ?",
953 unless opts[:skip_preload] do
954 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
960 defp restrict_muted(query, _), do: query
962 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
963 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
964 domain_blocks = user.domain_blocks || []
966 following_ap_ids = User.get_friends_ap_ids(user)
969 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
972 [activity, object: o] in query,
973 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
976 "((not (? && ?)) or ? = ?)",
984 "recipients_contain_blocked_domains(?, ?) = false",
990 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
997 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1005 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1014 defp restrict_blocked(query, _), do: query
1016 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1021 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1023 ^[Constants.as_public()]
1028 defp restrict_unlisted(query, _), do: query
1030 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1031 from(activity in query, where: activity.id in ^ids)
1034 defp restrict_pinned(query, _), do: query
1036 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1037 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1043 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1051 defp restrict_muted_reblogs(query, _), do: query
1053 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1056 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1060 defp restrict_instance(query, _), do: query
1062 defp restrict_filtered(query, %{user: %User{} = user}) do
1063 case Filter.compose_regex(user) do
1068 from([activity, object] in query,
1070 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1071 activity.actor == ^user.ap_id
1076 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1077 restrict_filtered(query, %{user: user})
1080 defp restrict_filtered(query, _), do: query
1082 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1084 defp exclude_poll_votes(query, _) do
1085 if has_named_binding?(query, :object) do
1086 from([activity, object: o] in query,
1087 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1094 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1096 defp exclude_chat_messages(query, _) do
1097 if has_named_binding?(query, :object) do
1098 from([activity, object: o] in query,
1099 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1106 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1108 defp exclude_invisible_actors(query, _opts) do
1110 User.Query.build(%{invisible: true, select: [:ap_id]})
1112 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1114 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1117 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1118 from(activity in query, where: activity.id != ^id)
1121 defp exclude_id(query, _), do: query
1123 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1125 defp maybe_preload_objects(query, _) do
1127 |> Activity.with_preloaded_object()
1130 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1132 defp maybe_preload_bookmarks(query, opts) do
1134 |> Activity.with_preloaded_bookmark(opts[:user])
1137 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1139 |> Activity.with_preloaded_report_notes()
1142 defp maybe_preload_report_notes(query, _), do: query
1144 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1146 defp maybe_set_thread_muted_field(query, opts) do
1148 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1151 defp maybe_order(query, %{order: :desc}) do
1153 |> order_by(desc: :id)
1156 defp maybe_order(query, %{order: :asc}) do
1158 |> order_by(asc: :id)
1161 defp maybe_order(query, _), do: query
1163 defp fetch_activities_query_ap_ids_ops(opts) do
1164 source_user = opts[:muting_user]
1165 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1167 ap_id_relationships =
1168 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1169 [:block | ap_id_relationships]
1174 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1176 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1177 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1179 restrict_muted_reblogs_opts =
1180 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1182 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1185 def fetch_activities_query(recipients, opts \\ %{}) do
1186 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1187 fetch_activities_query_ap_ids_ops(opts)
1190 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1195 |> maybe_preload_objects(opts)
1196 |> maybe_preload_bookmarks(opts)
1197 |> maybe_preload_report_notes(opts)
1198 |> maybe_set_thread_muted_field(opts)
1199 |> maybe_order(opts)
1200 |> restrict_recipients(recipients, opts[:user])
1201 |> restrict_replies(opts)
1202 |> restrict_since(opts)
1203 |> restrict_local(opts)
1204 |> restrict_actor(opts)
1205 |> restrict_type(opts)
1206 |> restrict_state(opts)
1207 |> restrict_favorited_by(opts)
1208 |> restrict_blocked(restrict_blocked_opts)
1209 |> restrict_muted(restrict_muted_opts)
1210 |> restrict_filtered(opts)
1211 |> restrict_media(opts)
1212 |> restrict_visibility(opts)
1213 |> restrict_thread_visibility(opts, config)
1214 |> restrict_reblogs(opts)
1215 |> restrict_pinned(opts)
1216 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1217 |> restrict_instance(opts)
1218 |> restrict_announce_object_actor(opts)
1219 |> restrict_filtered(opts)
1220 |> Activity.restrict_deactivated_users()
1221 |> exclude_poll_votes(opts)
1222 |> exclude_chat_messages(opts)
1223 |> exclude_invisible_actors(opts)
1224 |> exclude_visibility(opts)
1226 if Config.improved_hashtag_timeline() do
1228 |> restrict_hashtag_any(opts)
1229 |> restrict_hashtag_all(opts)
1230 |> restrict_hashtag_reject_any(opts)
1233 |> restrict_embedded_tag_any(opts)
1234 |> restrict_embedded_tag_all(opts)
1235 |> restrict_embedded_tag_reject_any(opts)
1239 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1240 list_memberships = Pleroma.List.memberships(opts[:user])
1242 fetch_activities_query(recipients ++ list_memberships, opts)
1243 |> Pagination.fetch_paginated(opts, pagination)
1245 |> maybe_update_cc(list_memberships, opts[:user])
1249 Fetch favorites activities of user with order by sort adds to favorites
1251 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1252 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1254 |> Activity.Queries.by_actor()
1255 |> Activity.Queries.by_type("Like")
1256 |> Activity.with_joined_object()
1257 |> Object.with_joined_activity()
1258 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1259 |> order_by([like, _, _], desc_nulls_last: like.id)
1260 |> Pagination.fetch_paginated(
1261 Map.merge(params, %{skip_order: true}),
1266 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1267 Enum.map(activities, fn
1268 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1269 if Enum.any?(bcc, &(&1 in list_memberships)) do
1270 update_in(activity.data["cc"], &[user_ap_id | &1])
1280 defp maybe_update_cc(activities, _, _), do: activities
1282 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1283 from(activity in query,
1285 fragment("? && ?", activity.recipients, ^recipients) or
1286 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1287 ^Constants.as_public() in activity.recipients)
1291 def fetch_activities_bounded(
1293 recipients_with_public,
1295 pagination \\ :keyset
1297 fetch_activities_query([], opts)
1298 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1299 |> Pagination.fetch_paginated(opts, pagination)
1303 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1304 def upload(file, opts \\ []) do
1305 with {:ok, data} <- Upload.store(file, opts) do
1306 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1308 Repo.insert(%Object{data: obj_data})
1312 @spec get_actor_url(any()) :: binary() | nil
1313 defp get_actor_url(url) when is_binary(url), do: url
1314 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1316 defp get_actor_url(url) when is_list(url) do
1322 defp get_actor_url(_url), do: nil
1324 defp object_to_user_data(data) do
1326 data["icon"]["url"] &&
1329 "url" => [%{"href" => data["icon"]["url"]}]
1333 data["image"]["url"] &&
1336 "url" => [%{"href" => data["image"]["url"]}]
1341 |> Map.get("attachment", [])
1342 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1343 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1347 |> Map.get("tag", [])
1349 %{"type" => "Emoji"} -> true
1352 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1353 {String.trim(name, ":"), url}
1356 is_locked = data["manuallyApprovesFollowers"] || false
1357 capabilities = data["capabilities"] || %{}
1358 accepts_chat_messages = capabilities["acceptsChatMessages"]
1359 data = Transmogrifier.maybe_fix_user_object(data)
1360 is_discoverable = data["discoverable"] || false
1361 invisible = data["invisible"] || false
1362 actor_type = data["type"] || "Person"
1365 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1366 data["publicKey"]["publicKeyPem"]
1372 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1373 data["endpoints"]["sharedInbox"]
1380 uri: get_actor_url(data["url"]),
1385 is_locked: is_locked,
1386 is_discoverable: is_discoverable,
1387 invisible: invisible,
1390 follower_address: data["followers"],
1391 following_address: data["following"],
1392 bio: data["summary"] || "",
1393 actor_type: actor_type,
1394 also_known_as: Map.get(data, "alsoKnownAs", []),
1395 public_key: public_key,
1396 inbox: data["inbox"],
1397 shared_inbox: shared_inbox,
1398 accepts_chat_messages: accepts_chat_messages
1401 # nickname can be nil because of virtual actors
1402 if data["preferredUsername"] do
1406 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1409 Map.put(user_data, :nickname, nil)
1413 def fetch_follow_information_for_user(user) do
1414 with {:ok, following_data} <-
1415 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1416 {:ok, hide_follows} <- collection_private(following_data),
1417 {:ok, followers_data} <-
1418 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1419 {:ok, hide_followers} <- collection_private(followers_data) do
1422 hide_follows: hide_follows,
1423 follower_count: normalize_counter(followers_data["totalItems"]),
1424 following_count: normalize_counter(following_data["totalItems"]),
1425 hide_followers: hide_followers
1428 {:error, _} = e -> e
1433 defp normalize_counter(counter) when is_integer(counter), do: counter
1434 defp normalize_counter(_), do: 0
1436 def maybe_update_follow_information(user_data) do
1437 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1438 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1440 {:collections_available,
1441 !!(user_data[:following_address] && user_data[:follower_address])},
1443 fetch_follow_information_for_user(user_data) do
1444 info = Map.merge(user_data[:info] || %{}, info)
1447 |> Map.put(:info, info)
1449 {:user_type_check, false} ->
1452 {:collections_available, false} ->
1455 {:enabled, false} ->
1460 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1467 defp collection_private(%{"first" => %{"type" => type}})
1468 when type in ["CollectionPage", "OrderedCollectionPage"],
1471 defp collection_private(%{"first" => first}) do
1472 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1473 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1476 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1477 {:error, _} = e -> e
1482 defp collection_private(_data), do: {:ok, true}
1484 def user_data_from_user_object(data) do
1485 with {:ok, data} <- MRF.filter(data) do
1486 {:ok, object_to_user_data(data)}
1492 def fetch_and_prepare_user_from_ap_id(ap_id) do
1493 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1494 {:ok, data} <- user_data_from_user_object(data) do
1495 {:ok, maybe_update_follow_information(data)}
1497 # If this has been deleted, only log a debug and not an error
1498 {:error, "Object has been deleted" = e} ->
1499 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1502 {:error, {:reject, reason} = e} ->
1503 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1507 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1512 def maybe_handle_clashing_nickname(data) do
1513 with nickname when is_binary(nickname) <- data[:nickname],
1514 %User{} = old_user <- User.get_by_nickname(nickname),
1515 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1517 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1523 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1524 |> User.update_and_set_cache()
1526 {:ap_id_comparison, true} ->
1528 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1536 def make_user_from_ap_id(ap_id) do
1537 user = User.get_cached_by_ap_id(ap_id)
1539 if user && !User.ap_enabled?(user) do
1540 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1542 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1545 |> User.remote_user_changeset(data)
1546 |> User.update_and_set_cache()
1548 maybe_handle_clashing_nickname(data)
1551 |> User.remote_user_changeset()
1559 def make_user_from_nickname(nickname) do
1560 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1561 make_user_from_ap_id(ap_id)
1563 _e -> {:error, "No AP id in WebFinger"}
1567 # filter out broken threads
1568 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1569 entire_thread_visible_for_user?(activity, user)
1572 # do post-processing on a specific activity
1573 def contain_activity(%Activity{} = activity, %User{} = user) do
1574 contain_broken_threads(activity, user)
1577 def fetch_direct_messages_query do
1579 |> restrict_type(%{type: "Create"})
1580 |> restrict_visibility(%{visibility: "direct"})
1581 |> order_by([activity], asc: activity.id)