1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub.Persisting do
6 @callback persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
9 defmodule Pleroma.Web.ActivityPub.ActivityPub do
10 alias Pleroma.Activity
11 alias Pleroma.Activity.Ir.Topics
13 alias Pleroma.Constants
14 alias Pleroma.Conversation
15 alias Pleroma.Conversation.Participation
18 alias Pleroma.Notification
20 alias Pleroma.Object.Containment
21 alias Pleroma.Object.Fetcher
22 alias Pleroma.Pagination
26 alias Pleroma.Web.ActivityPub.MRF
27 alias Pleroma.Web.ActivityPub.Transmogrifier
28 alias Pleroma.Web.Streamer
29 alias Pleroma.Web.WebFinger
30 alias Pleroma.Workers.BackgroundWorker
33 import Pleroma.Web.ActivityPub.Utils
34 import Pleroma.Web.ActivityPub.Visibility
37 require Pleroma.Constants
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
58 defp check_actor_is_active(nil), do: true
60 defp check_actor_is_active(actor) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{deactivated: deactivated} -> not deactivated
67 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
68 limit = Config.get([:instance, :remote_limit])
69 String.length(content) <= limit
72 defp check_remote_limit(_), do: true
74 def increase_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78 def decrease_note_count_if_public(actor, object) do
79 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82 defp increase_replies_count_if_reply(%{
83 "object" => %{"inReplyTo" => reply_ap_id} = object,
86 if is_public?(object) do
87 Object.increase_replies_count(reply_ap_id)
91 defp increase_replies_count_if_reply(_create_data), do: :noop
93 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
95 def persist(%{"type" => type} = object, meta) when type in @object_types do
96 with {:ok, object} <- Object.create(object) do
102 def persist(object, meta) do
103 with local <- Keyword.fetch!(meta, :local),
104 {recipients, _, _} <- get_recipients(object),
106 Repo.insert(%Activity{
109 recipients: recipients,
110 actor: object["actor"]
112 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
113 {:ok, _} <- maybe_create_activity_expiration(activity) do
114 {:ok, activity, meta}
118 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
119 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
120 with nil <- Activity.normalize(map),
121 map <- lazy_put_activity_defaults(map, fake),
122 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
123 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
124 {:ok, map} <- MRF.filter(map),
125 {recipients, _, _} = get_recipients(map),
126 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
127 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
128 {:ok, map, object} <- insert_full_object(map),
129 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
130 # Splice in the child object if we have one.
131 activity = Maps.put_if_present(activity, :object, object)
133 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
134 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
139 %Activity{} = activity ->
145 {:containment, _} = error ->
148 {:error, _} = error ->
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
156 recipients: recipients,
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
163 {:remote_limit_pass, _} ->
164 {:error, :remote_limit}
171 defp insert_activity_with_expiration(data, local, recipients) do
175 actor: data["actor"],
176 recipients: recipients
179 with {:ok, activity} <- Repo.insert(struct) do
180 maybe_create_activity_expiration(activity)
184 def notify_and_stream(activity) do
185 Notification.create_notifications(activity)
187 conversation = create_or_bump_conversation(activity, activity.actor)
188 participations = get_participations(conversation)
190 stream_out_participations(participations)
193 defp maybe_create_activity_expiration(
194 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
197 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
198 activity_id: activity.id,
199 expires_at: expires_at
205 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
207 defp create_or_bump_conversation(activity, actor) do
208 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
209 %User{} = user <- User.get_cached_by_ap_id(actor) do
210 Participation.mark_as_read(user, conversation)
215 defp get_participations({:ok, conversation}) do
217 |> Repo.preload(:participations, force: true)
218 |> Map.get(:participations)
221 defp get_participations(_), do: []
223 def stream_out_participations(participations) do
226 |> Repo.preload(:user)
228 Streamer.stream("participation", participations)
231 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
232 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
233 conversation = Repo.preload(conversation, :participations)
236 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
241 if last_activity_id do
242 stream_out_participations(conversation.participations)
247 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
256 def stream_out(_activity) do
260 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
261 def create(params, fake \\ false) do
262 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
267 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
268 additional = params[:additional] || %{}
269 # only accept false as false value
270 local = !(params[:local] == false)
271 published = params[:published]
272 quick_insert? = Config.get([:env]) == :benchmark
276 %{to: to, actor: actor, published: published, context: context, object: object},
280 with {:ok, activity} <- insert(create_data, local, fake),
281 {:fake, false, activity} <- {:fake, fake, activity},
282 _ <- increase_replies_count_if_reply(create_data),
283 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
284 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
285 _ <- notify_and_stream(activity),
286 :ok <- maybe_federate(activity) do
289 {:quick_insert, true, activity} ->
292 {:fake, true, activity} ->
296 Repo.rollback(message)
300 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
301 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
302 additional = params[:additional] || %{}
303 # only accept false as false value
304 local = !(params[:local] == false)
305 published = params[:published]
309 %{to: to, actor: actor, published: published, context: context, object: object},
313 with {:ok, activity} <- insert(listen_data, local),
314 _ <- notify_and_stream(activity),
315 :ok <- maybe_federate(activity) do
320 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
321 {:ok, Activity.t()} | nil | {:error, any()}
322 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
323 with {:ok, result} <-
324 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
329 defp do_unfollow(follower, followed, activity_id, local) do
330 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
331 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
332 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
333 {:ok, activity} <- insert(unfollow_data, local),
334 _ <- notify_and_stream(activity),
335 :ok <- maybe_federate(activity) do
339 {:error, error} -> Repo.rollback(error)
343 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
359 # only accept false as false value
360 local = !(params[:local] == false)
361 forward = !(params[:forward] == false)
363 additional = params[:additional] || %{}
367 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 Map.merge(additional, %{"to" => [], "cc" => []})
372 with flag_data <- make_flag_data(params, additional),
373 {:ok, activity} <- insert(flag_data, local),
374 {:ok, stripped_activity} <- strip_report_status_data(activity),
375 _ <- notify_and_stream(activity),
377 maybe_federate(stripped_activity) do
378 User.all_superusers()
379 |> Enum.filter(fn user -> not is_nil(user.email) end)
380 |> Enum.each(fn superuser ->
382 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
383 |> Pleroma.Emails.Mailer.deliver_async()
388 {:error, error} -> Repo.rollback(error)
392 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
393 def move(%User{} = origin, %User{} = target, local \\ true) do
396 "actor" => origin.ap_id,
397 "object" => origin.ap_id,
398 "target" => target.ap_id
401 with true <- origin.ap_id in target.also_known_as,
402 {:ok, activity} <- insert(params, local),
403 _ <- notify_and_stream(activity) do
404 maybe_federate(activity)
406 BackgroundWorker.enqueue("move_following", %{
407 "origin_id" => origin.id,
408 "target_id" => target.id
413 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
418 def fetch_activities_for_context_query(context, opts) do
419 public = [Constants.as_public()]
423 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
426 from(activity in Activity)
427 |> maybe_preload_objects(opts)
428 |> maybe_preload_bookmarks(opts)
429 |> maybe_set_thread_muted_field(opts)
430 |> restrict_blocked(opts)
431 |> restrict_recipients(recipients, opts[:user])
432 |> restrict_filtered(opts)
436 "?->>'type' = ? and ?->>'context' = ?",
443 |> exclude_poll_votes(opts)
445 |> order_by([activity], desc: activity.id)
448 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
449 def fetch_activities_for_context(context, opts \\ %{}) do
451 |> fetch_activities_for_context_query(opts)
455 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
456 FlakeId.Ecto.CompatType.t() | nil
457 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
460 |> restrict_visibility(%{visibility: "direct"})
466 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
467 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
468 opts = Map.delete(opts, :user)
470 [Constants.as_public()]
471 |> fetch_activities_query(opts)
472 |> restrict_unlisted(opts)
473 |> Pagination.fetch_paginated(opts, pagination)
476 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
477 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 |> Map.put(:restrict_unlisted, true)
480 |> fetch_public_or_unlisted_activities(pagination)
483 @valid_visibilities ~w[direct unlisted public private]
485 defp restrict_visibility(query, %{visibility: visibility})
486 when is_list(visibility) do
487 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
492 "activity_visibility(?, ?, ?) = ANY (?)",
500 Logger.error("Could not restrict visibility to #{visibility}")
504 defp restrict_visibility(query, %{visibility: visibility})
505 when visibility in @valid_visibilities do
509 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
513 defp restrict_visibility(_query, %{visibility: visibility})
514 when visibility not in @valid_visibilities do
515 Logger.error("Could not restrict visibility to #{visibility}")
518 defp restrict_visibility(query, _visibility), do: query
520 defp exclude_visibility(query, %{exclude_visibilities: visibility})
521 when is_list(visibility) do
522 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
527 "activity_visibility(?, ?, ?) = ANY (?)",
535 Logger.error("Could not exclude visibility to #{visibility}")
540 defp exclude_visibility(query, %{exclude_visibilities: visibility})
541 when visibility in @valid_visibilities do
546 "activity_visibility(?, ?, ?) = ?",
555 defp exclude_visibility(query, %{exclude_visibilities: visibility})
556 when visibility not in [nil | @valid_visibilities] do
557 Logger.error("Could not exclude visibility to #{visibility}")
561 defp exclude_visibility(query, _visibility), do: query
563 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
566 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
569 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
572 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
576 defp restrict_thread_visibility(query, _, _), do: query
578 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
581 |> Map.put(:user, reading_user)
582 |> Map.put(:actor_id, user.ap_id)
585 godmode: params[:godmode],
586 reading_user: reading_user
588 |> user_activities_recipients()
589 |> fetch_activities(params)
593 def fetch_user_activities(user, reading_user, params \\ %{}) do
596 |> Map.put(:type, ["Create", "Announce"])
597 |> Map.put(:user, reading_user)
598 |> Map.put(:actor_id, user.ap_id)
599 |> Map.put(:pinned_activity_ids, user.pinned_activities)
602 if User.blocks?(reading_user, user) do
606 |> Map.put(:blocking_user, reading_user)
607 |> Map.put(:muting_user, reading_user)
611 godmode: params[:godmode],
612 reading_user: reading_user
614 |> user_activities_recipients()
615 |> fetch_activities(params)
619 def fetch_statuses(reading_user, params) do
620 params = Map.put(params, :type, ["Create", "Announce"])
623 godmode: params[:godmode],
624 reading_user: reading_user
626 |> user_activities_recipients()
627 |> fetch_activities(params, :offset)
631 defp user_activities_recipients(%{godmode: true}), do: []
633 defp user_activities_recipients(%{reading_user: reading_user}) do
635 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
637 [Constants.as_public()]
641 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
642 raise "Can't use the child object without preloading!"
645 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
647 [activity, object] in query,
650 "?->>'type' != ? or ?->>'actor' != ?",
659 defp restrict_announce_object_actor(query, _), do: query
661 defp restrict_since(query, %{since_id: ""}), do: query
663 defp restrict_since(query, %{since_id: since_id}) do
664 from(activity in query, where: activity.id > ^since_id)
667 defp restrict_since(query, _), do: query
669 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
670 raise "Can't use the child object without preloading!"
673 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
675 [_activity, object] in query,
676 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
680 defp restrict_tag_reject(query, _), do: query
682 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
683 raise "Can't use the child object without preloading!"
686 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
693 defp restrict_tag_all(query, _), do: query
695 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
696 raise "Can't use the child object without preloading!"
699 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
701 [_activity, object] in query,
702 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
706 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
708 [_activity, object] in query,
709 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
713 defp restrict_tag(query, _), do: query
715 defp restrict_recipients(query, [], _user), do: query
717 defp restrict_recipients(query, recipients, nil) do
718 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
721 defp restrict_recipients(query, recipients, user) do
724 where: fragment("? && ?", ^recipients, activity.recipients),
725 or_where: activity.actor == ^user.ap_id
729 defp restrict_local(query, %{local_only: true}) do
730 from(activity in query, where: activity.local == true)
733 defp restrict_local(query, _), do: query
735 defp restrict_actor(query, %{actor_id: actor_id}) do
736 from(activity in query, where: activity.actor == ^actor_id)
739 defp restrict_actor(query, _), do: query
741 defp restrict_type(query, %{type: type}) when is_binary(type) do
742 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
745 defp restrict_type(query, %{type: type}) do
746 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
749 defp restrict_type(query, _), do: query
751 defp restrict_state(query, %{state: state}) do
752 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
755 defp restrict_state(query, _), do: query
757 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
759 [_activity, object] in query,
760 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
764 defp restrict_favorited_by(query, _), do: query
766 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
767 raise "Can't use the child object without preloading!"
770 defp restrict_media(query, %{only_media: true}) do
772 [activity, object] in query,
773 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
774 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
778 defp restrict_media(query, _), do: query
780 defp restrict_replies(query, %{exclude_replies: true}) do
782 [_activity, object] in query,
783 where: fragment("?->>'inReplyTo' is null", object.data)
787 defp restrict_replies(query, %{
788 reply_filtering_user: %User{} = user,
789 reply_visibility: "self"
792 [activity, object] in query,
795 "?->>'inReplyTo' is null OR ? = ANY(?)",
803 defp restrict_replies(query, %{
804 reply_filtering_user: %User{} = user,
805 reply_visibility: "following"
808 [activity, object] in query,
812 ?->>'type' != 'Create' -- This isn't a Create
813 OR ?->>'inReplyTo' is null -- this isn't a reply
814 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
815 -- unless they are the author (because authors
816 -- are also part of the recipients). This leads
817 -- to a bug that self-replies by friends won't
819 OR ? = ? -- The actor is us
823 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
832 defp restrict_replies(query, _), do: query
834 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
835 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
838 defp restrict_reblogs(query, _), do: query
840 defp restrict_muted(query, %{with_muted: true}), do: query
842 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
843 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
846 from([activity] in query,
847 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
850 "not (?->'to' \\?| ?) or ? = ?",
858 unless opts[:skip_preload] do
859 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
865 defp restrict_muted(query, _), do: query
867 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
868 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
869 domain_blocks = user.domain_blocks || []
871 following_ap_ids = User.get_friends_ap_ids(user)
874 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
877 [activity, object: o] in query,
878 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
881 "((not (? && ?)) or ? = ?)",
889 "recipients_contain_blocked_domains(?, ?) = false",
895 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
902 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
910 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
919 defp restrict_blocked(query, _), do: query
921 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
926 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
928 ^[Constants.as_public()]
933 defp restrict_unlisted(query, _), do: query
935 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
936 from(activity in query, where: activity.id in ^ids)
939 defp restrict_pinned(query, _), do: query
941 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
942 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
948 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
956 defp restrict_muted_reblogs(query, _), do: query
958 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
961 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
965 defp restrict_instance(query, _), do: query
967 defp restrict_filtered(query, %{user: %User{} = user}) do
968 case Filter.compose_regex(user) do
973 from([activity, object] in query,
975 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
976 activity.actor == ^user.ap_id
981 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
982 restrict_filtered(query, %{user: user})
985 defp restrict_filtered(query, _), do: query
987 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
989 defp exclude_poll_votes(query, _) do
990 if has_named_binding?(query, :object) do
991 from([activity, object: o] in query,
992 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
999 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1001 defp exclude_chat_messages(query, _) do
1002 if has_named_binding?(query, :object) do
1003 from([activity, object: o] in query,
1004 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1011 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1013 defp exclude_invisible_actors(query, _opts) do
1015 User.Query.build(%{invisible: true, select: [:ap_id]})
1017 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1019 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1022 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1023 from(activity in query, where: activity.id != ^id)
1026 defp exclude_id(query, _), do: query
1028 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1030 defp maybe_preload_objects(query, _) do
1032 |> Activity.with_preloaded_object()
1035 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1037 defp maybe_preload_bookmarks(query, opts) do
1039 |> Activity.with_preloaded_bookmark(opts[:user])
1042 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1044 |> Activity.with_preloaded_report_notes()
1047 defp maybe_preload_report_notes(query, _), do: query
1049 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1051 defp maybe_set_thread_muted_field(query, opts) do
1053 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1056 defp maybe_order(query, %{order: :desc}) do
1058 |> order_by(desc: :id)
1061 defp maybe_order(query, %{order: :asc}) do
1063 |> order_by(asc: :id)
1066 defp maybe_order(query, _), do: query
1068 defp fetch_activities_query_ap_ids_ops(opts) do
1069 source_user = opts[:muting_user]
1070 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1072 ap_id_relationships =
1073 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1074 [:block | ap_id_relationships]
1079 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1081 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1082 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1084 restrict_muted_reblogs_opts =
1085 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1087 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1090 def fetch_activities_query(recipients, opts \\ %{}) do
1091 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1092 fetch_activities_query_ap_ids_ops(opts)
1095 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1099 |> maybe_preload_objects(opts)
1100 |> maybe_preload_bookmarks(opts)
1101 |> maybe_preload_report_notes(opts)
1102 |> maybe_set_thread_muted_field(opts)
1103 |> maybe_order(opts)
1104 |> restrict_recipients(recipients, opts[:user])
1105 |> restrict_replies(opts)
1106 |> restrict_tag(opts)
1107 |> restrict_tag_reject(opts)
1108 |> restrict_tag_all(opts)
1109 |> restrict_since(opts)
1110 |> restrict_local(opts)
1111 |> restrict_actor(opts)
1112 |> restrict_type(opts)
1113 |> restrict_state(opts)
1114 |> restrict_favorited_by(opts)
1115 |> restrict_blocked(restrict_blocked_opts)
1116 |> restrict_muted(restrict_muted_opts)
1117 |> restrict_filtered(opts)
1118 |> restrict_media(opts)
1119 |> restrict_visibility(opts)
1120 |> restrict_thread_visibility(opts, config)
1121 |> restrict_reblogs(opts)
1122 |> restrict_pinned(opts)
1123 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1124 |> restrict_instance(opts)
1125 |> restrict_announce_object_actor(opts)
1126 |> restrict_filtered(opts)
1127 |> Activity.restrict_deactivated_users()
1128 |> exclude_poll_votes(opts)
1129 |> exclude_chat_messages(opts)
1130 |> exclude_invisible_actors(opts)
1131 |> exclude_visibility(opts)
1134 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1135 list_memberships = Pleroma.List.memberships(opts[:user])
1137 fetch_activities_query(recipients ++ list_memberships, opts)
1138 |> Pagination.fetch_paginated(opts, pagination)
1140 |> maybe_update_cc(list_memberships, opts[:user])
1144 Fetch favorites activities of user with order by sort adds to favorites
1146 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1147 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1149 |> Activity.Queries.by_actor()
1150 |> Activity.Queries.by_type("Like")
1151 |> Activity.with_joined_object()
1152 |> Object.with_joined_activity()
1153 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1154 |> order_by([like, _, _], desc_nulls_last: like.id)
1155 |> Pagination.fetch_paginated(
1156 Map.merge(params, %{skip_order: true}),
1161 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1162 Enum.map(activities, fn
1163 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1164 if Enum.any?(bcc, &(&1 in list_memberships)) do
1165 update_in(activity.data["cc"], &[user_ap_id | &1])
1175 defp maybe_update_cc(activities, _, _), do: activities
1177 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1178 from(activity in query,
1180 fragment("? && ?", activity.recipients, ^recipients) or
1181 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1182 ^Constants.as_public() in activity.recipients)
1186 def fetch_activities_bounded(
1188 recipients_with_public,
1190 pagination \\ :keyset
1192 fetch_activities_query([], opts)
1193 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1194 |> Pagination.fetch_paginated(opts, pagination)
1198 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1199 def upload(file, opts \\ []) do
1200 with {:ok, data} <- Upload.store(file, opts) do
1201 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1203 Repo.insert(%Object{data: obj_data})
1207 @spec get_actor_url(any()) :: binary() | nil
1208 defp get_actor_url(url) when is_binary(url), do: url
1209 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1211 defp get_actor_url(url) when is_list(url) do
1217 defp get_actor_url(_url), do: nil
1219 defp object_to_user_data(data) do
1221 data["icon"]["url"] &&
1224 "url" => [%{"href" => data["icon"]["url"]}]
1228 data["image"]["url"] &&
1231 "url" => [%{"href" => data["image"]["url"]}]
1236 |> Map.get("attachment", [])
1237 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1238 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1242 |> Map.get("tag", [])
1244 %{"type" => "Emoji"} -> true
1247 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1248 {String.trim(name, ":"), url}
1251 is_locked = data["manuallyApprovesFollowers"] || false
1252 capabilities = data["capabilities"] || %{}
1253 accepts_chat_messages = capabilities["acceptsChatMessages"]
1254 data = Transmogrifier.maybe_fix_user_object(data)
1255 is_discoverable = data["discoverable"] || false
1256 invisible = data["invisible"] || false
1257 actor_type = data["type"] || "Person"
1260 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1261 data["publicKey"]["publicKeyPem"]
1267 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1268 data["endpoints"]["sharedInbox"]
1275 uri: get_actor_url(data["url"]),
1280 is_locked: is_locked,
1281 is_discoverable: is_discoverable,
1282 invisible: invisible,
1285 follower_address: data["followers"],
1286 following_address: data["following"],
1287 bio: data["summary"] || "",
1288 actor_type: actor_type,
1289 also_known_as: Map.get(data, "alsoKnownAs", []),
1290 public_key: public_key,
1291 inbox: data["inbox"],
1292 shared_inbox: shared_inbox,
1293 accepts_chat_messages: accepts_chat_messages
1296 # nickname can be nil because of virtual actors
1297 if data["preferredUsername"] do
1301 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1304 Map.put(user_data, :nickname, nil)
1308 def fetch_follow_information_for_user(user) do
1309 with {:ok, following_data} <-
1310 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1311 {:ok, hide_follows} <- collection_private(following_data),
1312 {:ok, followers_data} <-
1313 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1314 {:ok, hide_followers} <- collection_private(followers_data) do
1317 hide_follows: hide_follows,
1318 follower_count: normalize_counter(followers_data["totalItems"]),
1319 following_count: normalize_counter(following_data["totalItems"]),
1320 hide_followers: hide_followers
1323 {:error, _} = e -> e
1328 defp normalize_counter(counter) when is_integer(counter), do: counter
1329 defp normalize_counter(_), do: 0
1331 def maybe_update_follow_information(user_data) do
1332 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1333 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1335 {:collections_available,
1336 !!(user_data[:following_address] && user_data[:follower_address])},
1338 fetch_follow_information_for_user(user_data) do
1339 info = Map.merge(user_data[:info] || %{}, info)
1342 |> Map.put(:info, info)
1344 {:user_type_check, false} ->
1347 {:collections_available, false} ->
1350 {:enabled, false} ->
1355 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1362 defp collection_private(%{"first" => %{"type" => type}})
1363 when type in ["CollectionPage", "OrderedCollectionPage"],
1366 defp collection_private(%{"first" => first}) do
1367 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1368 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1371 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1372 {:error, _} = e -> e
1377 defp collection_private(_data), do: {:ok, true}
1379 def user_data_from_user_object(data) do
1380 with {:ok, data} <- MRF.filter(data) do
1381 {:ok, object_to_user_data(data)}
1387 def fetch_and_prepare_user_from_ap_id(ap_id) do
1388 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1389 {:ok, data} <- user_data_from_user_object(data) do
1390 {:ok, maybe_update_follow_information(data)}
1392 # If this has been deleted, only log a debug and not an error
1393 {:error, "Object has been deleted" = e} ->
1394 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1397 {:error, {:reject, reason} = e} ->
1398 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1402 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1407 def maybe_handle_clashing_nickname(data) do
1408 with nickname when is_binary(nickname) <- data[:nickname],
1409 %User{} = old_user <- User.get_by_nickname(nickname),
1410 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1412 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1418 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1419 |> User.update_and_set_cache()
1421 {:ap_id_comparison, true} ->
1423 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1431 def make_user_from_ap_id(ap_id) do
1432 user = User.get_cached_by_ap_id(ap_id)
1434 if user && !User.ap_enabled?(user) do
1435 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1437 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1440 |> User.remote_user_changeset(data)
1441 |> User.update_and_set_cache()
1443 maybe_handle_clashing_nickname(data)
1446 |> User.remote_user_changeset()
1454 def make_user_from_nickname(nickname) do
1455 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1456 make_user_from_ap_id(ap_id)
1458 _e -> {:error, "No AP id in WebFinger"}
1462 # filter out broken threads
1463 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1464 entire_thread_visible_for_user?(activity, user)
1467 # do post-processing on a specific activity
1468 def contain_activity(%Activity{} = activity, %User{} = user) do
1469 contain_broken_threads(activity, user)
1472 def fetch_direct_messages_query do
1474 |> restrict_type(%{type: "Create"})
1475 |> restrict_visibility(%{visibility: "direct"})
1476 |> order_by([activity], asc: activity.id)