1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
54 defp check_actor_is_active(nil), do: true
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
68 defp check_remote_limit(_), do: true
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
87 defp increase_replies_count_if_reply(_create_data), do: :noop
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
102 Repo.insert(%Activity{
105 recipients: recipients,
106 actor: object["actor"]
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
135 %Activity{} = activity ->
141 {:containment, _} = error ->
144 {:error, _} = error ->
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
152 recipients: recipients,
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
167 defp insert_activity_with_expiration(data, local, recipients) do
171 actor: data["actor"],
172 recipients: recipients
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
186 stream_out_participations(participations)
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
211 defp get_participations({:ok, conversation}) do
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
217 defp get_participations(_), do: []
219 def stream_out_participations(participations) do
222 |> Repo.preload(:user)
224 Streamer.stream("participation", participations)
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
243 def stream_out_participations(_, _), do: :noop
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
252 def stream_out(_activity) do
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
272 %{to: to, actor: actor, published: published, context: context, object: object},
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
285 {:quick_insert, true, activity} ->
288 {:fake, true, activity} ->
292 Repo.rollback(message)
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
305 %{to: to, actor: actor, published: published, context: context, object: object},
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
335 {:error, error} -> Repo.rollback(error)
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
359 additional = params[:additional] || %{}
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
365 Map.merge(additional, %{"to" => [], "cc" => []})
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
384 {:error, error} -> Repo.rollback(error)
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
432 "?->>'type' = ? and ?->>'context' = ?",
439 |> exclude_poll_votes(opts)
441 |> order_by([activity], desc: activity.id)
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
447 |> fetch_activities_for_context_query(opts)
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
479 @valid_visibilities ~w[direct unlisted public private]
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
488 "activity_visibility(?, ?, ?) = ANY (?)",
496 Logger.error("Could not restrict visibility to #{visibility}")
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
514 defp restrict_visibility(query, _visibility), do: query
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 "activity_visibility(?, ?, ?) = ANY (?)",
531 Logger.error("Could not exclude visibility to #{visibility}")
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
542 "activity_visibility(?, ?, ?) = ?",
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
557 defp exclude_visibility(query, _visibility), do: query
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
572 defp restrict_thread_visibility(query, _, _), do: query
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
581 godmode: params[:godmode],
582 reading_user: reading_user
584 |> user_activities_recipients()
585 |> fetch_activities(params)
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
598 if User.blocks?(reading_user, user) do
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
607 godmode: params[:godmode],
608 reading_user: reading_user
610 |> user_activities_recipients()
611 |> fetch_activities(params)
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
619 godmode: params[:godmode],
620 reading_user: reading_user
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
627 defp user_activities_recipients(%{godmode: true}), do: []
629 defp user_activities_recipients(%{reading_user: reading_user}) do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
633 [Constants.as_public()]
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
643 [activity, object] in query,
646 "?->>'type' != ? or ?->>'actor' != ?",
655 defp restrict_announce_object_actor(query, _), do: query
657 defp restrict_since(query, %{since_id: ""}), do: query
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
663 defp restrict_since(query, _), do: query
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise_on_missing_preload()
669 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
676 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
677 restrict_tag_reject(query, %{tag_reject: [tag_reject]})
680 defp restrict_tag_reject(query, _), do: query
682 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
683 raise_on_missing_preload()
686 defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
693 defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
694 restrict_tag(query, %{tag: tag})
697 defp restrict_tag_all(query, _), do: query
699 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
700 raise_on_missing_preload()
703 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
710 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
711 restrict_tag(query, %{tag: [tag]})
714 defp restrict_tag(query, _), do: query
716 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
717 raise_on_missing_preload()
720 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
721 if has_named_binding?(query, :thread_mute) do
723 [activity, object, thread_mute] in query,
724 group_by: [activity.id, object.id, thread_mute.id]
728 [activity, object] in query,
729 group_by: [activity.id, object.id]
732 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
735 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
739 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
740 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
743 defp restrict_hashtag_reject_any(query, _), do: query
745 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
746 raise_on_missing_preload()
749 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
753 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
757 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
758 restrict_hashtag_any(query, %{tag: tag})
761 defp restrict_hashtag_all(query, _), do: query
763 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
764 raise_on_missing_preload()
767 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
769 [_activity, object] in query,
770 join: hashtag in assoc(object, :hashtags),
771 where: hashtag.name in ^tags
775 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
776 restrict_hashtag_any(query, %{tag: [tag]})
779 defp restrict_hashtag_any(query, _), do: query
781 defp raise_on_missing_preload do
782 raise "Can't use the child object without preloading!"
785 defp restrict_recipients(query, [], _user), do: query
787 defp restrict_recipients(query, recipients, nil) do
788 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
791 defp restrict_recipients(query, recipients, user) do
794 where: fragment("? && ?", ^recipients, activity.recipients),
795 or_where: activity.actor == ^user.ap_id
799 defp restrict_local(query, %{local_only: true}) do
800 from(activity in query, where: activity.local == true)
803 defp restrict_local(query, _), do: query
805 defp restrict_actor(query, %{actor_id: actor_id}) do
806 from(activity in query, where: activity.actor == ^actor_id)
809 defp restrict_actor(query, _), do: query
811 defp restrict_type(query, %{type: type}) when is_binary(type) do
812 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
815 defp restrict_type(query, %{type: type}) do
816 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
819 defp restrict_type(query, _), do: query
821 defp restrict_state(query, %{state: state}) do
822 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
825 defp restrict_state(query, _), do: query
827 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
829 [_activity, object] in query,
830 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
834 defp restrict_favorited_by(query, _), do: query
836 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
837 raise "Can't use the child object without preloading!"
840 defp restrict_media(query, %{only_media: true}) do
842 [activity, object] in query,
843 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
844 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
848 defp restrict_media(query, _), do: query
850 defp restrict_replies(query, %{exclude_replies: true}) do
852 [_activity, object] in query,
853 where: fragment("?->>'inReplyTo' is null", object.data)
857 defp restrict_replies(query, %{
858 reply_filtering_user: %User{} = user,
859 reply_visibility: "self"
862 [activity, object] in query,
865 "?->>'inReplyTo' is null OR ? = ANY(?)",
873 defp restrict_replies(query, %{
874 reply_filtering_user: %User{} = user,
875 reply_visibility: "following"
878 [activity, object] in query,
882 ?->>'type' != 'Create' -- This isn't a Create
883 OR ?->>'inReplyTo' is null -- this isn't a reply
884 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
885 -- unless they are the author (because authors
886 -- are also part of the recipients). This leads
887 -- to a bug that self-replies by friends won't
889 OR ? = ? -- The actor is us
893 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
902 defp restrict_replies(query, _), do: query
904 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
905 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
908 defp restrict_reblogs(query, _), do: query
910 defp restrict_muted(query, %{with_muted: true}), do: query
912 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
913 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
916 from([activity] in query,
917 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
920 "not (?->'to' \\?| ?) or ? = ?",
928 unless opts[:skip_preload] do
929 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
935 defp restrict_muted(query, _), do: query
937 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
938 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
939 domain_blocks = user.domain_blocks || []
941 following_ap_ids = User.get_friends_ap_ids(user)
944 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
947 [activity, object: o] in query,
948 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
951 "((not (? && ?)) or ? = ?)",
959 "recipients_contain_blocked_domains(?, ?) = false",
965 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
972 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
980 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
989 defp restrict_blocked(query, _), do: query
991 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
996 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
998 ^[Constants.as_public()]
1003 defp restrict_unlisted(query, _), do: query
1005 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1006 from(activity in query, where: activity.id in ^ids)
1009 defp restrict_pinned(query, _), do: query
1011 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1012 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1018 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1026 defp restrict_muted_reblogs(query, _), do: query
1028 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1031 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1035 defp restrict_instance(query, _), do: query
1037 defp restrict_filtered(query, %{user: %User{} = user}) do
1038 case Filter.compose_regex(user) do
1043 from([activity, object] in query,
1045 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1046 activity.actor == ^user.ap_id
1051 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1052 restrict_filtered(query, %{user: user})
1055 defp restrict_filtered(query, _), do: query
1057 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1059 defp exclude_poll_votes(query, _) do
1060 if has_named_binding?(query, :object) do
1061 from([activity, object: o] in query,
1062 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1069 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1071 defp exclude_chat_messages(query, _) do
1072 if has_named_binding?(query, :object) do
1073 from([activity, object: o] in query,
1074 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1081 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1083 defp exclude_invisible_actors(query, _opts) do
1085 User.Query.build(%{invisible: true, select: [:ap_id]})
1087 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1089 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1092 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1093 from(activity in query, where: activity.id != ^id)
1096 defp exclude_id(query, _), do: query
1098 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1100 defp maybe_preload_objects(query, _) do
1102 |> Activity.with_preloaded_object()
1105 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1107 defp maybe_preload_bookmarks(query, opts) do
1109 |> Activity.with_preloaded_bookmark(opts[:user])
1112 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1114 |> Activity.with_preloaded_report_notes()
1117 defp maybe_preload_report_notes(query, _), do: query
1119 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1121 defp maybe_set_thread_muted_field(query, opts) do
1123 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1126 defp maybe_order(query, %{order: :desc}) do
1128 |> order_by(desc: :id)
1131 defp maybe_order(query, %{order: :asc}) do
1133 |> order_by(asc: :id)
1136 defp maybe_order(query, _), do: query
1138 defp fetch_activities_query_ap_ids_ops(opts) do
1139 source_user = opts[:muting_user]
1140 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1142 ap_id_relationships =
1143 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1144 [:block | ap_id_relationships]
1149 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1151 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1152 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1154 restrict_muted_reblogs_opts =
1155 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1157 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1160 def fetch_activities_query(recipients, opts \\ %{}) do
1161 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1162 fetch_activities_query_ap_ids_ops(opts)
1165 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1170 |> distinct([a], true)
1171 |> maybe_preload_objects(opts)
1172 |> maybe_preload_bookmarks(opts)
1173 |> maybe_preload_report_notes(opts)
1174 |> maybe_set_thread_muted_field(opts)
1175 |> maybe_order(opts)
1176 |> restrict_recipients(recipients, opts[:user])
1177 |> restrict_replies(opts)
1178 |> restrict_since(opts)
1179 |> restrict_local(opts)
1180 |> restrict_actor(opts)
1181 |> restrict_type(opts)
1182 |> restrict_state(opts)
1183 |> restrict_favorited_by(opts)
1184 |> restrict_blocked(restrict_blocked_opts)
1185 |> restrict_muted(restrict_muted_opts)
1186 |> restrict_filtered(opts)
1187 |> restrict_media(opts)
1188 |> restrict_visibility(opts)
1189 |> restrict_thread_visibility(opts, config)
1190 |> restrict_reblogs(opts)
1191 |> restrict_pinned(opts)
1192 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1193 |> restrict_instance(opts)
1194 |> restrict_announce_object_actor(opts)
1195 |> restrict_filtered(opts)
1196 |> Activity.restrict_deactivated_users()
1197 |> exclude_poll_votes(opts)
1198 |> exclude_chat_messages(opts)
1199 |> exclude_invisible_actors(opts)
1200 |> exclude_visibility(opts)
1202 if Config.get([:instance, :improved_hashtag_timeline]) do
1204 |> restrict_hashtag_any(opts)
1205 |> restrict_hashtag_all(opts)
1206 |> restrict_hashtag_reject_any(opts)
1209 |> restrict_tag(opts)
1210 |> restrict_tag_reject(opts)
1211 |> restrict_tag_all(opts)
1215 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1216 list_memberships = Pleroma.List.memberships(opts[:user])
1218 fetch_activities_query(recipients ++ list_memberships, opts)
1219 |> Pagination.fetch_paginated(opts, pagination)
1221 |> maybe_update_cc(list_memberships, opts[:user])
1225 Fetch favorites activities of user with order by sort adds to favorites
1227 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1228 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1230 |> Activity.Queries.by_actor()
1231 |> Activity.Queries.by_type("Like")
1232 |> Activity.with_joined_object()
1233 |> Object.with_joined_activity()
1234 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1235 |> order_by([like, _, _], desc_nulls_last: like.id)
1236 |> Pagination.fetch_paginated(
1237 Map.merge(params, %{skip_order: true}),
1242 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1243 Enum.map(activities, fn
1244 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1245 if Enum.any?(bcc, &(&1 in list_memberships)) do
1246 update_in(activity.data["cc"], &[user_ap_id | &1])
1256 defp maybe_update_cc(activities, _, _), do: activities
1258 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1259 from(activity in query,
1261 fragment("? && ?", activity.recipients, ^recipients) or
1262 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1263 ^Constants.as_public() in activity.recipients)
1267 def fetch_activities_bounded(
1269 recipients_with_public,
1271 pagination \\ :keyset
1273 fetch_activities_query([], opts)
1274 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1275 |> Pagination.fetch_paginated(opts, pagination)
1279 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1280 def upload(file, opts \\ []) do
1281 with {:ok, data} <- Upload.store(file, opts) do
1282 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1284 Repo.insert(%Object{data: obj_data})
1288 @spec get_actor_url(any()) :: binary() | nil
1289 defp get_actor_url(url) when is_binary(url), do: url
1290 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1292 defp get_actor_url(url) when is_list(url) do
1298 defp get_actor_url(_url), do: nil
1300 defp object_to_user_data(data) do
1302 data["icon"]["url"] &&
1305 "url" => [%{"href" => data["icon"]["url"]}]
1309 data["image"]["url"] &&
1312 "url" => [%{"href" => data["image"]["url"]}]
1317 |> Map.get("attachment", [])
1318 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1319 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1323 |> Map.get("tag", [])
1325 %{"type" => "Emoji"} -> true
1328 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1329 {String.trim(name, ":"), url}
1332 is_locked = data["manuallyApprovesFollowers"] || false
1333 capabilities = data["capabilities"] || %{}
1334 accepts_chat_messages = capabilities["acceptsChatMessages"]
1335 data = Transmogrifier.maybe_fix_user_object(data)
1336 is_discoverable = data["discoverable"] || false
1337 invisible = data["invisible"] || false
1338 actor_type = data["type"] || "Person"
1341 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1342 data["publicKey"]["publicKeyPem"]
1348 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1349 data["endpoints"]["sharedInbox"]
1356 uri: get_actor_url(data["url"]),
1361 is_locked: is_locked,
1362 is_discoverable: is_discoverable,
1363 invisible: invisible,
1366 follower_address: data["followers"],
1367 following_address: data["following"],
1368 bio: data["summary"] || "",
1369 actor_type: actor_type,
1370 also_known_as: Map.get(data, "alsoKnownAs", []),
1371 public_key: public_key,
1372 inbox: data["inbox"],
1373 shared_inbox: shared_inbox,
1374 accepts_chat_messages: accepts_chat_messages
1377 # nickname can be nil because of virtual actors
1378 if data["preferredUsername"] do
1382 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1385 Map.put(user_data, :nickname, nil)
1389 def fetch_follow_information_for_user(user) do
1390 with {:ok, following_data} <-
1391 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1392 {:ok, hide_follows} <- collection_private(following_data),
1393 {:ok, followers_data} <-
1394 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1395 {:ok, hide_followers} <- collection_private(followers_data) do
1398 hide_follows: hide_follows,
1399 follower_count: normalize_counter(followers_data["totalItems"]),
1400 following_count: normalize_counter(following_data["totalItems"]),
1401 hide_followers: hide_followers
1404 {:error, _} = e -> e
1409 defp normalize_counter(counter) when is_integer(counter), do: counter
1410 defp normalize_counter(_), do: 0
1412 def maybe_update_follow_information(user_data) do
1413 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1414 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1416 {:collections_available,
1417 !!(user_data[:following_address] && user_data[:follower_address])},
1419 fetch_follow_information_for_user(user_data) do
1420 info = Map.merge(user_data[:info] || %{}, info)
1423 |> Map.put(:info, info)
1425 {:user_type_check, false} ->
1428 {:collections_available, false} ->
1431 {:enabled, false} ->
1436 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1443 defp collection_private(%{"first" => %{"type" => type}})
1444 when type in ["CollectionPage", "OrderedCollectionPage"],
1447 defp collection_private(%{"first" => first}) do
1448 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1449 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1452 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1453 {:error, _} = e -> e
1458 defp collection_private(_data), do: {:ok, true}
1460 def user_data_from_user_object(data) do
1461 with {:ok, data} <- MRF.filter(data) do
1462 {:ok, object_to_user_data(data)}
1468 def fetch_and_prepare_user_from_ap_id(ap_id) do
1469 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1470 {:ok, data} <- user_data_from_user_object(data) do
1471 {:ok, maybe_update_follow_information(data)}
1473 # If this has been deleted, only log a debug and not an error
1474 {:error, "Object has been deleted" = e} ->
1475 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1478 {:error, {:reject, reason} = e} ->
1479 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1483 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1488 def maybe_handle_clashing_nickname(data) do
1489 with nickname when is_binary(nickname) <- data[:nickname],
1490 %User{} = old_user <- User.get_by_nickname(nickname),
1491 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1493 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1499 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1500 |> User.update_and_set_cache()
1502 {:ap_id_comparison, true} ->
1504 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1512 def make_user_from_ap_id(ap_id) do
1513 user = User.get_cached_by_ap_id(ap_id)
1515 if user && !User.ap_enabled?(user) do
1516 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1518 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1521 |> User.remote_user_changeset(data)
1522 |> User.update_and_set_cache()
1524 maybe_handle_clashing_nickname(data)
1527 |> User.remote_user_changeset()
1535 def make_user_from_nickname(nickname) do
1536 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1537 make_user_from_ap_id(ap_id)
1539 _e -> {:error, "No AP id in WebFinger"}
1543 # filter out broken threads
1544 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1545 entire_thread_visible_for_user?(activity, user)
1548 # do post-processing on a specific activity
1549 def contain_activity(%Activity{} = activity, %User{} = user) do
1550 contain_broken_threads(activity, user)
1553 def fetch_direct_messages_query do
1555 |> restrict_type(%{type: "Create"})
1556 |> restrict_visibility(%{visibility: "direct"})
1557 |> order_by([activity], asc: activity.id)