1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
389 {:error, error} -> Repo.rollback(error)
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
437 "?->>'type' = ? and ?->>'context' = ?",
444 |> exclude_poll_votes(opts)
446 |> order_by([activity], desc: activity.id)
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
452 |> fetch_activities_for_context_query(opts)
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
484 @valid_visibilities ~w[direct unlisted public private]
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
493 "activity_visibility(?, ?, ?) = ANY (?)",
501 Logger.error("Could not restrict visibility to #{visibility}")
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
519 defp restrict_visibility(query, _visibility), do: query
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
528 "activity_visibility(?, ?, ?) = ANY (?)",
536 Logger.error("Could not exclude visibility to #{visibility}")
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
547 "activity_visibility(?, ?, ?) = ?",
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
562 defp exclude_visibility(query, _visibility), do: query
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
577 defp restrict_thread_visibility(query, _, _), do: query
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
586 godmode: params[:godmode],
587 reading_user: reading_user
589 |> user_activities_recipients()
590 |> fetch_activities(params)
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
603 if User.blocks?(reading_user, user) do
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
611 pagination_type = Map.get(params, :pagination_type) || :keyset
614 godmode: params[:godmode],
615 reading_user: reading_user
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
626 godmode: params[:godmode],
627 reading_user: reading_user
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
634 defp user_activities_recipients(%{godmode: true}), do: []
636 defp user_activities_recipients(%{reading_user: reading_user}) do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
640 [Constants.as_public()]
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
650 [activity, object] in query,
653 "?->>'type' != ? or ?->>'actor' != ?",
662 defp restrict_announce_object_actor(query, _), do: query
664 defp restrict_since(query, %{since_id: ""}), do: query
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
670 defp restrict_since(query, _), do: query
672 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
673 raise "Can't use the child object without preloading!"
676 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
678 [_activity, object] in query,
679 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
683 defp restrict_tag_reject(query, _), do: query
685 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
686 raise "Can't use the child object without preloading!"
689 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
691 [_activity, object] in query,
692 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
696 defp restrict_tag_all(query, _), do: query
698 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
699 raise "Can't use the child object without preloading!"
702 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
709 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
711 [_activity, object] in query,
712 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
716 defp restrict_tag(query, _), do: query
718 defp restrict_recipients(query, [], _user), do: query
720 defp restrict_recipients(query, recipients, nil) do
721 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
724 defp restrict_recipients(query, recipients, user) do
727 where: fragment("? && ?", ^recipients, activity.recipients),
728 or_where: activity.actor == ^user.ap_id
732 defp restrict_local(query, %{local_only: true}) do
733 from(activity in query, where: activity.local == true)
736 defp restrict_local(query, _), do: query
738 defp restrict_remote(query, %{remote: true}) do
739 from(activity in query, where: activity.local == false)
742 defp restrict_remote(query, _), do: query
744 defp restrict_actor(query, %{actor_id: actor_id}) do
745 from(activity in query, where: activity.actor == ^actor_id)
748 defp restrict_actor(query, _), do: query
750 defp restrict_type(query, %{type: type}) when is_binary(type) do
751 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
754 defp restrict_type(query, %{type: type}) do
755 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
758 defp restrict_type(query, _), do: query
760 defp restrict_state(query, %{state: state}) do
761 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
764 defp restrict_state(query, _), do: query
766 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
768 [_activity, object] in query,
769 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
773 defp restrict_favorited_by(query, _), do: query
775 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
776 raise "Can't use the child object without preloading!"
779 defp restrict_media(query, %{only_media: true}) do
781 [activity, object] in query,
782 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
783 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
787 defp restrict_media(query, _), do: query
789 defp restrict_replies(query, %{exclude_replies: true}) do
791 [_activity, object] in query,
792 where: fragment("?->>'inReplyTo' is null", object.data)
796 defp restrict_replies(query, %{
797 reply_filtering_user: %User{} = user,
798 reply_visibility: "self"
801 [activity, object] in query,
804 "?->>'inReplyTo' is null OR ? = ANY(?)",
812 defp restrict_replies(query, %{
813 reply_filtering_user: %User{} = user,
814 reply_visibility: "following"
817 [activity, object] in query,
821 ?->>'type' != 'Create' -- This isn't a Create
822 OR ?->>'inReplyTo' is null -- this isn't a reply
823 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
824 -- unless they are the author (because authors
825 -- are also part of the recipients). This leads
826 -- to a bug that self-replies by friends won't
828 OR ? = ? -- The actor is us
832 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
841 defp restrict_replies(query, _), do: query
843 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
844 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
847 defp restrict_reblogs(query, _), do: query
849 defp restrict_muted(query, %{with_muted: true}), do: query
851 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
852 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
855 from([activity] in query,
856 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
859 "not (?->'to' \\?| ?) or ? = ?",
867 unless opts[:skip_preload] do
868 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
874 defp restrict_muted(query, _), do: query
876 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
877 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
878 domain_blocks = user.domain_blocks || []
880 following_ap_ids = User.get_friends_ap_ids(user)
883 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
886 [activity, object: o] in query,
887 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
890 "((not (? && ?)) or ? = ?)",
898 "recipients_contain_blocked_domains(?, ?) = false",
904 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
911 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
919 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
928 defp restrict_blocked(query, _), do: query
930 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
935 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
937 ^[Constants.as_public()]
942 defp restrict_unlisted(query, _), do: query
944 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
945 from(activity in query, where: activity.id in ^ids)
948 defp restrict_pinned(query, _), do: query
950 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
951 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
957 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
965 defp restrict_muted_reblogs(query, _), do: query
967 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
970 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
974 defp restrict_instance(query, _), do: query
976 defp restrict_filtered(query, %{user: %User{} = user}) do
977 case Filter.compose_regex(user) do
982 from([activity, object] in query,
984 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
985 activity.actor == ^user.ap_id
990 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
991 restrict_filtered(query, %{user: user})
994 defp restrict_filtered(query, _), do: query
996 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
998 defp exclude_poll_votes(query, _) do
999 if has_named_binding?(query, :object) do
1000 from([activity, object: o] in query,
1001 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1008 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1010 defp exclude_chat_messages(query, _) do
1011 if has_named_binding?(query, :object) do
1012 from([activity, object: o] in query,
1013 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1020 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1022 defp exclude_invisible_actors(query, _opts) do
1024 User.Query.build(%{invisible: true, select: [:ap_id]})
1026 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1028 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1031 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1032 from(activity in query, where: activity.id != ^id)
1035 defp exclude_id(query, _), do: query
1037 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1039 defp maybe_preload_objects(query, _) do
1041 |> Activity.with_preloaded_object()
1044 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1046 defp maybe_preload_bookmarks(query, opts) do
1048 |> Activity.with_preloaded_bookmark(opts[:user])
1051 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1053 |> Activity.with_preloaded_report_notes()
1056 defp maybe_preload_report_notes(query, _), do: query
1058 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1060 defp maybe_set_thread_muted_field(query, opts) do
1062 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1065 defp maybe_order(query, %{order: :desc}) do
1067 |> order_by(desc: :id)
1070 defp maybe_order(query, %{order: :asc}) do
1072 |> order_by(asc: :id)
1075 defp maybe_order(query, _), do: query
1077 defp fetch_activities_query_ap_ids_ops(opts) do
1078 source_user = opts[:muting_user]
1079 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1081 ap_id_relationships =
1082 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1083 [:block | ap_id_relationships]
1088 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1090 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1091 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1093 restrict_muted_reblogs_opts =
1094 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1096 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1099 def fetch_activities_query(recipients, opts \\ %{}) do
1100 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1101 fetch_activities_query_ap_ids_ops(opts)
1104 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1108 |> maybe_preload_objects(opts)
1109 |> maybe_preload_bookmarks(opts)
1110 |> maybe_preload_report_notes(opts)
1111 |> maybe_set_thread_muted_field(opts)
1112 |> maybe_order(opts)
1113 |> restrict_recipients(recipients, opts[:user])
1114 |> restrict_replies(opts)
1115 |> restrict_tag(opts)
1116 |> restrict_tag_reject(opts)
1117 |> restrict_tag_all(opts)
1118 |> restrict_since(opts)
1119 |> restrict_local(opts)
1120 |> restrict_remote(opts)
1121 |> restrict_actor(opts)
1122 |> restrict_type(opts)
1123 |> restrict_state(opts)
1124 |> restrict_favorited_by(opts)
1125 |> restrict_blocked(restrict_blocked_opts)
1126 |> restrict_muted(restrict_muted_opts)
1127 |> restrict_filtered(opts)
1128 |> restrict_media(opts)
1129 |> restrict_visibility(opts)
1130 |> restrict_thread_visibility(opts, config)
1131 |> restrict_reblogs(opts)
1132 |> restrict_pinned(opts)
1133 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1134 |> restrict_instance(opts)
1135 |> restrict_announce_object_actor(opts)
1136 |> restrict_filtered(opts)
1137 |> Activity.restrict_deactivated_users()
1138 |> exclude_poll_votes(opts)
1139 |> exclude_chat_messages(opts)
1140 |> exclude_invisible_actors(opts)
1141 |> exclude_visibility(opts)
1144 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1145 list_memberships = Pleroma.List.memberships(opts[:user])
1147 fetch_activities_query(recipients ++ list_memberships, opts)
1148 |> Pagination.fetch_paginated(opts, pagination)
1150 |> maybe_update_cc(list_memberships, opts[:user])
1154 Fetch favorites activities of user with order by sort adds to favorites
1156 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1157 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1159 |> Activity.Queries.by_actor()
1160 |> Activity.Queries.by_type("Like")
1161 |> Activity.with_joined_object()
1162 |> Object.with_joined_activity()
1163 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1164 |> order_by([like, _, _], desc_nulls_last: like.id)
1165 |> Pagination.fetch_paginated(
1166 Map.merge(params, %{skip_order: true}),
1171 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1172 Enum.map(activities, fn
1173 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1174 if Enum.any?(bcc, &(&1 in list_memberships)) do
1175 update_in(activity.data["cc"], &[user_ap_id | &1])
1185 defp maybe_update_cc(activities, _, _), do: activities
1187 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1188 from(activity in query,
1190 fragment("? && ?", activity.recipients, ^recipients) or
1191 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1192 ^Constants.as_public() in activity.recipients)
1196 def fetch_activities_bounded(
1198 recipients_with_public,
1200 pagination \\ :keyset
1202 fetch_activities_query([], opts)
1203 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1204 |> Pagination.fetch_paginated(opts, pagination)
1208 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1209 def upload(file, opts \\ []) do
1210 with {:ok, data} <- Upload.store(file, opts) do
1211 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1213 Repo.insert(%Object{data: obj_data})
1217 @spec get_actor_url(any()) :: binary() | nil
1218 defp get_actor_url(url) when is_binary(url), do: url
1219 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1221 defp get_actor_url(url) when is_list(url) do
1227 defp get_actor_url(_url), do: nil
1229 defp object_to_user_data(data) do
1231 data["icon"]["url"] &&
1234 "url" => [%{"href" => data["icon"]["url"]}]
1238 data["image"]["url"] &&
1241 "url" => [%{"href" => data["image"]["url"]}]
1246 |> Map.get("attachment", [])
1247 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1248 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1252 |> Map.get("tag", [])
1254 %{"type" => "Emoji"} -> true
1257 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1258 {String.trim(name, ":"), url}
1261 is_locked = data["manuallyApprovesFollowers"] || false
1262 capabilities = data["capabilities"] || %{}
1263 accepts_chat_messages = capabilities["acceptsChatMessages"]
1264 data = Transmogrifier.maybe_fix_user_object(data)
1265 is_discoverable = data["discoverable"] || false
1266 invisible = data["invisible"] || false
1267 actor_type = data["type"] || "Person"
1270 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1271 data["publicKey"]["publicKeyPem"]
1277 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1278 data["endpoints"]["sharedInbox"]
1285 uri: get_actor_url(data["url"]),
1290 is_locked: is_locked,
1291 is_discoverable: is_discoverable,
1292 invisible: invisible,
1295 follower_address: data["followers"],
1296 following_address: data["following"],
1297 bio: data["summary"] || "",
1298 actor_type: actor_type,
1299 also_known_as: Map.get(data, "alsoKnownAs", []),
1300 public_key: public_key,
1301 inbox: data["inbox"],
1302 shared_inbox: shared_inbox,
1303 accepts_chat_messages: accepts_chat_messages
1306 # nickname can be nil because of virtual actors
1307 if data["preferredUsername"] do
1311 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1314 Map.put(user_data, :nickname, nil)
1318 def fetch_follow_information_for_user(user) do
1319 with {:ok, following_data} <-
1320 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1321 {:ok, hide_follows} <- collection_private(following_data),
1322 {:ok, followers_data} <-
1323 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1324 {:ok, hide_followers} <- collection_private(followers_data) do
1327 hide_follows: hide_follows,
1328 follower_count: normalize_counter(followers_data["totalItems"]),
1329 following_count: normalize_counter(following_data["totalItems"]),
1330 hide_followers: hide_followers
1333 {:error, _} = e -> e
1338 defp normalize_counter(counter) when is_integer(counter), do: counter
1339 defp normalize_counter(_), do: 0
1341 def maybe_update_follow_information(user_data) do
1342 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1343 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1345 {:collections_available,
1346 !!(user_data[:following_address] && user_data[:follower_address])},
1348 fetch_follow_information_for_user(user_data) do
1349 info = Map.merge(user_data[:info] || %{}, info)
1352 |> Map.put(:info, info)
1354 {:user_type_check, false} ->
1357 {:collections_available, false} ->
1360 {:enabled, false} ->
1365 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1372 defp collection_private(%{"first" => %{"type" => type}})
1373 when type in ["CollectionPage", "OrderedCollectionPage"],
1376 defp collection_private(%{"first" => first}) do
1377 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1378 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1381 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1382 {:error, _} = e -> e
1387 defp collection_private(_data), do: {:ok, true}
1389 def user_data_from_user_object(data) do
1390 with {:ok, data} <- MRF.filter(data) do
1391 {:ok, object_to_user_data(data)}
1397 def fetch_and_prepare_user_from_ap_id(ap_id) do
1398 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1399 {:ok, data} <- user_data_from_user_object(data) do
1400 {:ok, maybe_update_follow_information(data)}
1402 # If this has been deleted, only log a debug and not an error
1403 {:error, "Object has been deleted" = e} ->
1404 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1407 {:error, {:reject, reason} = e} ->
1408 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1412 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1417 def maybe_handle_clashing_nickname(data) do
1418 with nickname when is_binary(nickname) <- data[:nickname],
1419 %User{} = old_user <- User.get_by_nickname(nickname),
1420 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1422 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1428 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1429 |> User.update_and_set_cache()
1431 {:ap_id_comparison, true} ->
1433 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1441 def make_user_from_ap_id(ap_id) do
1442 user = User.get_cached_by_ap_id(ap_id)
1444 if user && !User.ap_enabled?(user) do
1445 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1447 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1450 |> User.remote_user_changeset(data)
1451 |> User.update_and_set_cache()
1453 maybe_handle_clashing_nickname(data)
1456 |> User.remote_user_changeset()
1464 def make_user_from_nickname(nickname) do
1465 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1466 make_user_from_ap_id(ap_id)
1468 _e -> {:error, "No AP id in WebFinger"}
1472 # filter out broken threads
1473 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1474 entire_thread_visible_for_user?(activity, user)
1477 # do post-processing on a specific activity
1478 def contain_activity(%Activity{} = activity, %User{} = user) do
1479 contain_broken_threads(activity, user)
1482 def fetch_direct_messages_query do
1484 |> restrict_type(%{type: "Create"})
1485 |> restrict_visibility(%{visibility: "direct"})
1486 |> order_by([activity], asc: activity.id)