1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
390 {:error, error} -> Repo.rollback(error)
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id
403 with true <- origin.ap_id in target.also_known_as,
404 {:ok, activity} <- insert(params, local),
405 _ <- notify_and_stream(activity) do
406 maybe_federate(activity)
408 BackgroundWorker.enqueue("move_following", %{
409 "origin_id" => origin.id,
410 "target_id" => target.id
415 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
420 def fetch_activities_for_context_query(context, opts) do
421 public = [Constants.as_public()]
425 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
428 from(activity in Activity)
429 |> maybe_preload_objects(opts)
430 |> maybe_preload_bookmarks(opts)
431 |> maybe_set_thread_muted_field(opts)
432 |> restrict_blocked(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
438 "?->>'type' = ? and ?->>'context' = ?",
445 |> exclude_poll_votes(opts)
447 |> order_by([activity], desc: activity.id)
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
453 |> fetch_activities_for_context_query(opts)
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
485 @valid_visibilities ~w[direct unlisted public private]
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
494 "activity_visibility(?, ?, ?) = ANY (?)",
502 Logger.error("Could not restrict visibility to #{visibility}")
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
520 defp restrict_visibility(query, _visibility), do: query
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
529 "activity_visibility(?, ?, ?) = ANY (?)",
537 Logger.error("Could not exclude visibility to #{visibility}")
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
548 "activity_visibility(?, ?, ?) = ?",
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
563 defp exclude_visibility(query, _visibility), do: query
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
578 defp restrict_thread_visibility(query, _, _), do: query
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
587 godmode: params[:godmode],
588 reading_user: reading_user
590 |> user_activities_recipients()
591 |> fetch_activities(params)
595 def fetch_user_activities(user, reading_user, params \\ %{})
597 def fetch_user_activities(user, reading_user, %{total: true} = params) do
598 result = fetch_activities_for_user(user, reading_user, params)
600 Keyword.put(result, :items, Enum.reverse(result[:items]))
603 def fetch_user_activities(user, reading_user, params) do
605 |> fetch_activities_for_user(reading_user, params)
609 defp fetch_activities_for_user(user, reading_user, params) do
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
618 if User.blocks?(reading_user, user) do
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
626 pagination_type = Map.get(params, :pagination_type) || :keyset
629 godmode: params[:godmode],
630 reading_user: reading_user
632 |> user_activities_recipients()
633 |> fetch_activities(params, pagination_type)
636 def fetch_statuses(reading_user, %{total: true} = params) do
637 result = fetch_activities_for_reading_user(reading_user, params)
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
641 def fetch_statuses(reading_user, params) do
643 |> fetch_activities_for_reading_user(params)
647 defp fetch_activities_for_reading_user(reading_user, params) do
648 params = Map.put(params, :type, ["Create", "Announce"])
651 godmode: params[:godmode],
652 reading_user: reading_user
654 |> user_activities_recipients()
655 |> fetch_activities(params, :offset)
658 defp user_activities_recipients(%{godmode: true}), do: []
660 defp user_activities_recipients(%{reading_user: reading_user}) do
662 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
664 [Constants.as_public()]
668 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
669 raise "Can't use the child object without preloading!"
672 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
674 [activity, object] in query,
677 "?->>'type' != ? or ?->>'actor' != ?",
686 defp restrict_announce_object_actor(query, _), do: query
688 defp restrict_since(query, %{since_id: ""}), do: query
690 defp restrict_since(query, %{since_id: since_id}) do
691 from(activity in query, where: activity.id > ^since_id)
694 defp restrict_since(query, _), do: query
696 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
697 raise "Can't use the child object without preloading!"
700 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
702 [_activity, object] in query,
703 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
707 defp restrict_tag_reject(query, _), do: query
709 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
710 raise "Can't use the child object without preloading!"
713 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
715 [_activity, object] in query,
716 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
720 defp restrict_tag_all(query, _), do: query
722 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
723 raise "Can't use the child object without preloading!"
726 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
728 [_activity, object] in query,
729 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
733 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
735 [_activity, object] in query,
736 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
740 defp restrict_tag(query, _), do: query
742 defp restrict_recipients(query, [], _user), do: query
744 defp restrict_recipients(query, recipients, nil) do
745 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
748 defp restrict_recipients(query, recipients, user) do
751 where: fragment("? && ?", ^recipients, activity.recipients),
752 or_where: activity.actor == ^user.ap_id
756 defp restrict_local(query, %{local_only: true}) do
757 from(activity in query, where: activity.local == true)
760 defp restrict_local(query, _), do: query
762 defp restrict_remote(query, %{remote: true}) do
763 from(activity in query, where: activity.local == false)
766 defp restrict_remote(query, _), do: query
768 defp restrict_actor(query, %{actor_id: actor_id}) do
769 from(activity in query, where: activity.actor == ^actor_id)
772 defp restrict_actor(query, _), do: query
774 defp restrict_type(query, %{type: type}) when is_binary(type) do
775 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
778 defp restrict_type(query, %{type: type}) do
779 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
782 defp restrict_type(query, _), do: query
784 defp restrict_state(query, %{state: state}) do
785 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
788 defp restrict_state(query, _), do: query
790 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
792 [_activity, object] in query,
793 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
797 defp restrict_favorited_by(query, _), do: query
799 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
800 raise "Can't use the child object without preloading!"
803 defp restrict_media(query, %{only_media: true}) do
805 [activity, object] in query,
806 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
807 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
811 defp restrict_media(query, _), do: query
813 defp restrict_replies(query, %{exclude_replies: true}) do
815 [_activity, object] in query,
816 where: fragment("?->>'inReplyTo' is null", object.data)
820 defp restrict_replies(query, %{
821 reply_filtering_user: %User{} = user,
822 reply_visibility: "self"
825 [activity, object] in query,
828 "?->>'inReplyTo' is null OR ? = ANY(?)",
836 defp restrict_replies(query, %{
837 reply_filtering_user: %User{} = user,
838 reply_visibility: "following"
841 [activity, object] in query,
845 ?->>'type' != 'Create' -- This isn't a Create
846 OR ?->>'inReplyTo' is null -- this isn't a reply
847 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
848 -- unless they are the author (because authors
849 -- are also part of the recipients). This leads
850 -- to a bug that self-replies by friends won't
852 OR ? = ? -- The actor is us
856 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
865 defp restrict_replies(query, _), do: query
867 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
868 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
871 defp restrict_reblogs(query, _), do: query
873 defp restrict_muted(query, %{with_muted: true}), do: query
875 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
876 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
879 from([activity] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
883 "not (?->'to' \\?| ?) or ? = ?",
891 unless opts[:skip_preload] do
892 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
898 defp restrict_muted(query, _), do: query
900 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
901 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
902 domain_blocks = user.domain_blocks || []
904 following_ap_ids = User.get_friends_ap_ids(user)
907 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
910 [activity, object: o] in query,
911 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
914 "((not (? && ?)) or ? = ?)",
922 "recipients_contain_blocked_domains(?, ?) = false",
928 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
935 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
943 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
952 defp restrict_blocked(query, _), do: query
954 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
959 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
961 ^[Constants.as_public()]
966 defp restrict_unlisted(query, _), do: query
968 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
969 from(activity in query, where: activity.id in ^ids)
972 defp restrict_pinned(query, _), do: query
974 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
975 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
981 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
989 defp restrict_muted_reblogs(query, _), do: query
991 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
994 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
998 defp restrict_instance(query, _), do: query
1000 defp restrict_filtered(query, %{user: %User{} = user}) do
1001 case Filter.compose_regex(user) do
1006 from([activity, object] in query,
1008 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1009 activity.actor == ^user.ap_id
1014 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1015 restrict_filtered(query, %{user: user})
1018 defp restrict_filtered(query, _), do: query
1020 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1022 defp exclude_poll_votes(query, _) do
1023 if has_named_binding?(query, :object) do
1024 from([activity, object: o] in query,
1025 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1032 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1034 defp exclude_chat_messages(query, _) do
1035 if has_named_binding?(query, :object) do
1036 from([activity, object: o] in query,
1037 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1044 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1046 defp exclude_invisible_actors(query, _opts) do
1048 User.Query.build(%{invisible: true, select: [:ap_id]})
1050 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1052 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1055 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1056 from(activity in query, where: activity.id != ^id)
1059 defp exclude_id(query, _), do: query
1061 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1063 defp maybe_preload_objects(query, _) do
1065 |> Activity.with_preloaded_object()
1068 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1070 defp maybe_preload_bookmarks(query, opts) do
1072 |> Activity.with_preloaded_bookmark(opts[:user])
1075 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1077 |> Activity.with_preloaded_report_notes()
1080 defp maybe_preload_report_notes(query, _), do: query
1082 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1084 defp maybe_set_thread_muted_field(query, opts) do
1086 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1089 defp maybe_order(query, %{order: :desc}) do
1091 |> order_by(desc: :id)
1094 defp maybe_order(query, %{order: :asc}) do
1096 |> order_by(asc: :id)
1099 defp maybe_order(query, _), do: query
1101 defp fetch_activities_query_ap_ids_ops(opts) do
1102 source_user = opts[:muting_user]
1103 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1105 ap_id_relationships =
1106 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1107 [:block | ap_id_relationships]
1112 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1114 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1115 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1117 restrict_muted_reblogs_opts =
1118 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1120 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1123 def fetch_activities_query(recipients, opts \\ %{}) do
1124 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1125 fetch_activities_query_ap_ids_ops(opts)
1128 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1132 |> maybe_preload_objects(opts)
1133 |> maybe_preload_bookmarks(opts)
1134 |> maybe_preload_report_notes(opts)
1135 |> maybe_set_thread_muted_field(opts)
1136 |> maybe_order(opts)
1137 |> restrict_recipients(recipients, opts[:user])
1138 |> restrict_replies(opts)
1139 |> restrict_tag(opts)
1140 |> restrict_tag_reject(opts)
1141 |> restrict_tag_all(opts)
1142 |> restrict_since(opts)
1143 |> restrict_local(opts)
1144 |> restrict_remote(opts)
1145 |> restrict_actor(opts)
1146 |> restrict_type(opts)
1147 |> restrict_state(opts)
1148 |> restrict_favorited_by(opts)
1149 |> restrict_blocked(restrict_blocked_opts)
1150 |> restrict_muted(restrict_muted_opts)
1151 |> restrict_filtered(opts)
1152 |> restrict_media(opts)
1153 |> restrict_visibility(opts)
1154 |> restrict_thread_visibility(opts, config)
1155 |> restrict_reblogs(opts)
1156 |> restrict_pinned(opts)
1157 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1158 |> restrict_instance(opts)
1159 |> restrict_announce_object_actor(opts)
1160 |> restrict_filtered(opts)
1161 |> Activity.restrict_deactivated_users()
1162 |> exclude_poll_votes(opts)
1163 |> exclude_chat_messages(opts)
1164 |> exclude_invisible_actors(opts)
1165 |> exclude_visibility(opts)
1168 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1169 list_memberships = Pleroma.List.memberships(opts[:user])
1171 fetch_activities_query(recipients ++ list_memberships, opts)
1172 |> Pagination.fetch_paginated(opts, pagination)
1174 |> maybe_update_cc(list_memberships, opts[:user])
1178 Fetch favorites activities of user with order by sort adds to favorites
1180 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1181 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1183 |> Activity.Queries.by_actor()
1184 |> Activity.Queries.by_type("Like")
1185 |> Activity.with_joined_object()
1186 |> Object.with_joined_activity()
1187 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1188 |> order_by([like, _, _], desc_nulls_last: like.id)
1189 |> Pagination.fetch_paginated(
1190 Map.merge(params, %{skip_order: true}),
1195 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1196 Enum.map(activities, fn
1197 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1198 if Enum.any?(bcc, &(&1 in list_memberships)) do
1199 update_in(activity.data["cc"], &[user_ap_id | &1])
1209 defp maybe_update_cc(activities, _, _), do: activities
1211 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1212 from(activity in query,
1214 fragment("? && ?", activity.recipients, ^recipients) or
1215 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1216 ^Constants.as_public() in activity.recipients)
1220 def fetch_activities_bounded(
1222 recipients_with_public,
1224 pagination \\ :keyset
1226 fetch_activities_query([], opts)
1227 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1228 |> Pagination.fetch_paginated(opts, pagination)
1232 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1233 def upload(file, opts \\ []) do
1234 with {:ok, data} <- Upload.store(file, opts) do
1235 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1237 Repo.insert(%Object{data: obj_data})
1241 @spec get_actor_url(any()) :: binary() | nil
1242 defp get_actor_url(url) when is_binary(url), do: url
1243 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1245 defp get_actor_url(url) when is_list(url) do
1251 defp get_actor_url(_url), do: nil
1253 defp object_to_user_data(data) do
1255 data["icon"]["url"] &&
1258 "url" => [%{"href" => data["icon"]["url"]}]
1262 data["image"]["url"] &&
1265 "url" => [%{"href" => data["image"]["url"]}]
1270 |> Map.get("attachment", [])
1271 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1272 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1276 |> Map.get("tag", [])
1278 %{"type" => "Emoji"} -> true
1281 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1282 {String.trim(name, ":"), url}
1285 is_locked = data["manuallyApprovesFollowers"] || false
1286 capabilities = data["capabilities"] || %{}
1287 accepts_chat_messages = capabilities["acceptsChatMessages"]
1288 data = Transmogrifier.maybe_fix_user_object(data)
1289 is_discoverable = data["discoverable"] || false
1290 invisible = data["invisible"] || false
1291 actor_type = data["type"] || "Person"
1294 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1295 data["publicKey"]["publicKeyPem"]
1301 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1302 data["endpoints"]["sharedInbox"]
1309 uri: get_actor_url(data["url"]),
1314 is_locked: is_locked,
1315 is_discoverable: is_discoverable,
1316 invisible: invisible,
1319 follower_address: data["followers"],
1320 following_address: data["following"],
1321 bio: data["summary"] || "",
1322 actor_type: actor_type,
1323 also_known_as: Map.get(data, "alsoKnownAs", []),
1324 public_key: public_key,
1325 inbox: data["inbox"],
1326 shared_inbox: shared_inbox,
1327 accepts_chat_messages: accepts_chat_messages
1330 # nickname can be nil because of virtual actors
1331 if data["preferredUsername"] do
1335 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1338 Map.put(user_data, :nickname, nil)
1342 def fetch_follow_information_for_user(user) do
1343 with {:ok, following_data} <-
1344 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1345 {:ok, hide_follows} <- collection_private(following_data),
1346 {:ok, followers_data} <-
1347 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1348 {:ok, hide_followers} <- collection_private(followers_data) do
1351 hide_follows: hide_follows,
1352 follower_count: normalize_counter(followers_data["totalItems"]),
1353 following_count: normalize_counter(following_data["totalItems"]),
1354 hide_followers: hide_followers
1357 {:error, _} = e -> e
1362 defp normalize_counter(counter) when is_integer(counter), do: counter
1363 defp normalize_counter(_), do: 0
1365 def maybe_update_follow_information(user_data) do
1366 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1367 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1369 {:collections_available,
1370 !!(user_data[:following_address] && user_data[:follower_address])},
1372 fetch_follow_information_for_user(user_data) do
1373 info = Map.merge(user_data[:info] || %{}, info)
1376 |> Map.put(:info, info)
1378 {:user_type_check, false} ->
1381 {:collections_available, false} ->
1384 {:enabled, false} ->
1389 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1396 defp collection_private(%{"first" => %{"type" => type}})
1397 when type in ["CollectionPage", "OrderedCollectionPage"],
1400 defp collection_private(%{"first" => first}) do
1401 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1402 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1405 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1406 {:error, _} = e -> e
1411 defp collection_private(_data), do: {:ok, true}
1413 def user_data_from_user_object(data) do
1414 with {:ok, data} <- MRF.filter(data) do
1415 {:ok, object_to_user_data(data)}
1421 def fetch_and_prepare_user_from_ap_id(ap_id) do
1422 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1423 {:ok, data} <- user_data_from_user_object(data) do
1424 {:ok, maybe_update_follow_information(data)}
1426 # If this has been deleted, only log a debug and not an error
1427 {:error, "Object has been deleted" = e} ->
1428 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1431 {:error, {:reject, reason} = e} ->
1432 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1436 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1441 def maybe_handle_clashing_nickname(data) do
1442 with nickname when is_binary(nickname) <- data[:nickname],
1443 %User{} = old_user <- User.get_by_nickname(nickname),
1444 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1446 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1452 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1453 |> User.update_and_set_cache()
1455 {:ap_id_comparison, true} ->
1457 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1465 def make_user_from_ap_id(ap_id) do
1466 user = User.get_cached_by_ap_id(ap_id)
1468 if user && !User.ap_enabled?(user) do
1469 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1471 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1474 |> User.remote_user_changeset(data)
1475 |> User.update_and_set_cache()
1477 maybe_handle_clashing_nickname(data)
1480 |> User.remote_user_changeset()
1488 def make_user_from_nickname(nickname) do
1489 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1490 make_user_from_ap_id(ap_id)
1492 _e -> {:error, "No AP id in WebFinger"}
1496 # filter out broken threads
1497 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1498 entire_thread_visible_for_user?(activity, user)
1501 # do post-processing on a specific activity
1502 def contain_activity(%Activity{} = activity, %User{} = user) do
1503 contain_broken_threads(activity, user)
1506 def fetch_direct_messages_query do
1508 |> restrict_type(%{type: "Create"})
1509 |> restrict_visibility(%{visibility: "direct"})
1510 |> order_by([activity], asc: activity.id)