1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
53 defp check_actor_is_active(nil), do: true
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
67 defp check_remote_limit(_), do: true
69 def increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
86 defp increase_replies_count_if_reply(_create_data), do: :noop
88 @object_types ~w[ChatMessage Question Answer Audio Event]
89 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
90 def persist(%{"type" => type} = object, meta) when type in @object_types do
91 with {:ok, object} <- Object.create(object) do
96 def persist(object, meta) do
97 with local <- Keyword.fetch!(meta, :local),
98 {recipients, _, _} <- get_recipients(object),
100 Repo.insert(%Activity{
103 recipients: recipients,
104 actor: object["actor"]
106 {:ok, activity, meta}
110 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
111 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
112 with nil <- Activity.normalize(map),
113 map <- lazy_put_activity_defaults(map, fake),
114 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
115 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
116 {:ok, map} <- MRF.filter(map),
117 {recipients, _, _} = get_recipients(map),
118 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
119 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
120 {:ok, map, object} <- insert_full_object(map) do
126 recipients: recipients
129 |> maybe_create_activity_expiration()
131 # Splice in the child object if we have one.
132 activity = Maps.put_if_present(activity, :object, object)
134 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
138 %Activity{} = activity ->
141 {:fake, true, map, recipients} ->
142 activity = %Activity{
146 recipients: recipients,
150 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 def notify_and_stream(activity) do
159 Notification.create_notifications(activity)
161 conversation = create_or_bump_conversation(activity, activity.actor)
162 participations = get_participations(conversation)
164 stream_out_participations(participations)
167 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
168 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
173 defp maybe_create_activity_expiration(result), do: result
175 defp create_or_bump_conversation(activity, actor) do
176 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
177 %User{} = user <- User.get_cached_by_ap_id(actor) do
178 Participation.mark_as_read(user, conversation)
183 defp get_participations({:ok, conversation}) do
185 |> Repo.preload(:participations, force: true)
186 |> Map.get(:participations)
189 defp get_participations(_), do: []
191 def stream_out_participations(participations) do
194 |> Repo.preload(:user)
196 Streamer.stream("participation", participations)
199 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
200 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
201 conversation = Repo.preload(conversation, :participations)
204 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
209 if last_activity_id do
210 stream_out_participations(conversation.participations)
215 def stream_out_participations(_, _), do: :noop
217 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
218 when data_type in ["Create", "Announce", "Delete"] do
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(activity)
224 def stream_out(_activity) do
228 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
229 def create(params, fake \\ false) do
230 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
235 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
236 additional = params[:additional] || %{}
237 # only accept false as false value
238 local = !(params[:local] == false)
239 published = params[:published]
240 quick_insert? = Config.get([:env]) == :benchmark
244 %{to: to, actor: actor, published: published, context: context, object: object},
248 with {:ok, activity} <- insert(create_data, local, fake),
249 {:fake, false, activity} <- {:fake, fake, activity},
250 _ <- increase_replies_count_if_reply(create_data),
251 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
252 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
253 _ <- notify_and_stream(activity),
254 :ok <- maybe_federate(activity) do
257 {:quick_insert, true, activity} ->
260 {:fake, true, activity} ->
264 Repo.rollback(message)
268 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
269 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(listen_data, local),
282 _ <- notify_and_stream(activity),
283 :ok <- maybe_federate(activity) do
288 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
289 {:ok, Activity.t()} | nil | {:error, any()}
290 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
291 with {:ok, result} <-
292 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
297 defp do_unfollow(follower, followed, activity_id, local) do
298 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
299 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
300 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
301 {:ok, activity} <- insert(unfollow_data, local),
302 _ <- notify_and_stream(activity),
303 :ok <- maybe_federate(activity) do
307 {:error, error} -> Repo.rollback(error)
311 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
321 # only accept false as false value
322 local = !(params[:local] == false)
323 forward = !(params[:forward] == false)
325 additional = params[:additional] || %{}
329 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
331 Map.merge(additional, %{"to" => [], "cc" => []})
334 with flag_data <- make_flag_data(params, additional),
335 {:ok, activity} <- insert(flag_data, local),
336 {:ok, stripped_activity} <- strip_report_status_data(activity),
337 _ <- notify_and_stream(activity),
338 :ok <- maybe_federate(stripped_activity) do
339 User.all_superusers()
340 |> Enum.filter(fn user -> not is_nil(user.email) end)
341 |> Enum.each(fn superuser ->
343 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
344 |> Pleroma.Emails.Mailer.deliver_async()
351 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
352 def move(%User{} = origin, %User{} = target, local \\ true) do
355 "actor" => origin.ap_id,
356 "object" => origin.ap_id,
357 "target" => target.ap_id
360 with true <- origin.ap_id in target.also_known_as,
361 {:ok, activity} <- insert(params, local),
362 _ <- notify_and_stream(activity) do
363 maybe_federate(activity)
365 BackgroundWorker.enqueue("move_following", %{
366 "origin_id" => origin.id,
367 "target_id" => target.id
372 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
377 def fetch_activities_for_context_query(context, opts) do
378 public = [Constants.as_public()]
382 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
385 from(activity in Activity)
386 |> maybe_preload_objects(opts)
387 |> maybe_preload_bookmarks(opts)
388 |> maybe_set_thread_muted_field(opts)
389 |> restrict_blocked(opts)
390 |> restrict_recipients(recipients, opts[:user])
391 |> restrict_filtered(opts)
395 "?->>'type' = ? and ?->>'context' = ?",
402 |> exclude_poll_votes(opts)
404 |> order_by([activity], desc: activity.id)
407 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
408 def fetch_activities_for_context(context, opts \\ %{}) do
410 |> fetch_activities_for_context_query(opts)
414 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
415 FlakeId.Ecto.CompatType.t() | nil
416 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
418 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
419 |> restrict_visibility(%{visibility: "direct"})
425 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
426 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
427 opts = Map.delete(opts, :user)
429 [Constants.as_public()]
430 |> fetch_activities_query(opts)
431 |> restrict_unlisted(opts)
432 |> Pagination.fetch_paginated(opts, pagination)
435 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
436 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
438 |> Map.put(:restrict_unlisted, true)
439 |> fetch_public_or_unlisted_activities(pagination)
442 @valid_visibilities ~w[direct unlisted public private]
444 defp restrict_visibility(query, %{visibility: visibility})
445 when is_list(visibility) do
446 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
451 "activity_visibility(?, ?, ?) = ANY (?)",
459 Logger.error("Could not restrict visibility to #{visibility}")
463 defp restrict_visibility(query, %{visibility: visibility})
464 when visibility in @valid_visibilities do
468 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
472 defp restrict_visibility(_query, %{visibility: visibility})
473 when visibility not in @valid_visibilities do
474 Logger.error("Could not restrict visibility to #{visibility}")
477 defp restrict_visibility(query, _visibility), do: query
479 defp exclude_visibility(query, %{exclude_visibilities: visibility})
480 when is_list(visibility) do
481 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
486 "activity_visibility(?, ?, ?) = ANY (?)",
494 Logger.error("Could not exclude visibility to #{visibility}")
499 defp exclude_visibility(query, %{exclude_visibilities: visibility})
500 when visibility in @valid_visibilities do
505 "activity_visibility(?, ?, ?) = ?",
514 defp exclude_visibility(query, %{exclude_visibilities: visibility})
515 when visibility not in [nil | @valid_visibilities] do
516 Logger.error("Could not exclude visibility to #{visibility}")
520 defp exclude_visibility(query, _visibility), do: query
522 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
525 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
528 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
531 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
535 defp restrict_thread_visibility(query, _, _), do: query
537 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
540 |> Map.put(:user, reading_user)
541 |> Map.put(:actor_id, user.ap_id)
544 godmode: params[:godmode],
545 reading_user: reading_user
547 |> user_activities_recipients()
548 |> fetch_activities(params)
552 def fetch_user_activities(user, reading_user, params \\ %{}) do
555 |> Map.put(:type, ["Create", "Announce"])
556 |> Map.put(:user, reading_user)
557 |> Map.put(:actor_id, user.ap_id)
558 |> Map.put(:pinned_activity_ids, user.pinned_activities)
561 if User.blocks?(reading_user, user) do
565 |> Map.put(:blocking_user, reading_user)
566 |> Map.put(:muting_user, reading_user)
570 godmode: params[:godmode],
571 reading_user: reading_user
573 |> user_activities_recipients()
574 |> fetch_activities(params)
578 def fetch_statuses(reading_user, params) do
579 params = Map.put(params, :type, ["Create", "Announce"])
582 godmode: params[:godmode],
583 reading_user: reading_user
585 |> user_activities_recipients()
586 |> fetch_activities(params, :offset)
590 defp user_activities_recipients(%{godmode: true}), do: []
592 defp user_activities_recipients(%{reading_user: reading_user}) do
594 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
596 [Constants.as_public()]
600 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
601 raise "Can't use the child object without preloading!"
604 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
606 [activity, object] in query,
609 "?->>'type' != ? or ?->>'actor' != ?",
618 defp restrict_announce_object_actor(query, _), do: query
620 defp restrict_since(query, %{since_id: ""}), do: query
622 defp restrict_since(query, %{since_id: since_id}) do
623 from(activity in query, where: activity.id > ^since_id)
626 defp restrict_since(query, _), do: query
628 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
629 raise "Can't use the child object without preloading!"
632 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
634 [_activity, object] in query,
635 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
639 defp restrict_tag_reject(query, _), do: query
641 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
642 raise "Can't use the child object without preloading!"
645 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
647 [_activity, object] in query,
648 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
652 defp restrict_tag_all(query, _), do: query
654 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
655 raise "Can't use the child object without preloading!"
658 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
660 [_activity, object] in query,
661 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
665 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
667 [_activity, object] in query,
668 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
672 defp restrict_tag(query, _), do: query
674 defp restrict_recipients(query, [], _user), do: query
676 defp restrict_recipients(query, recipients, nil) do
677 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
680 defp restrict_recipients(query, recipients, user) do
683 where: fragment("? && ?", ^recipients, activity.recipients),
684 or_where: activity.actor == ^user.ap_id
688 defp restrict_local(query, %{local_only: true}) do
689 from(activity in query, where: activity.local == true)
692 defp restrict_local(query, _), do: query
694 defp restrict_actor(query, %{actor_id: actor_id}) do
695 from(activity in query, where: activity.actor == ^actor_id)
698 defp restrict_actor(query, _), do: query
700 defp restrict_type(query, %{type: type}) when is_binary(type) do
701 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
704 defp restrict_type(query, %{type: type}) do
705 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
708 defp restrict_type(query, _), do: query
710 defp restrict_state(query, %{state: state}) do
711 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
714 defp restrict_state(query, _), do: query
716 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
718 [_activity, object] in query,
719 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
723 defp restrict_favorited_by(query, _), do: query
725 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
726 raise "Can't use the child object without preloading!"
729 defp restrict_media(query, %{only_media: true}) do
731 [activity, object] in query,
732 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
733 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
737 defp restrict_media(query, _), do: query
739 defp restrict_replies(query, %{exclude_replies: true}) do
741 [_activity, object] in query,
742 where: fragment("?->>'inReplyTo' is null", object.data)
746 defp restrict_replies(query, %{
747 reply_filtering_user: user,
748 reply_visibility: "self"
751 [activity, object] in query,
754 "?->>'inReplyTo' is null OR ? = ANY(?)",
762 defp restrict_replies(query, %{
763 reply_filtering_user: user,
764 reply_visibility: "following"
767 [activity, object] in query,
770 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
772 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
781 defp restrict_replies(query, _), do: query
783 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
784 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
787 defp restrict_reblogs(query, _), do: query
789 defp restrict_muted(query, %{with_muted: true}), do: query
791 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
792 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
795 from([activity] in query,
796 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
797 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
800 unless opts[:skip_preload] do
801 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
807 defp restrict_muted(query, _), do: query
809 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
810 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
811 domain_blocks = user.domain_blocks || []
813 following_ap_ids = User.get_friends_ap_ids(user)
816 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
819 [activity, object: o] in query,
820 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
821 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
824 "recipients_contain_blocked_domains(?, ?) = false",
830 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
837 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
845 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
854 defp restrict_blocked(query, _), do: query
856 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
861 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
863 ^[Constants.as_public()]
868 defp restrict_unlisted(query, _), do: query
870 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
871 from(activity in query, where: activity.id in ^ids)
874 defp restrict_pinned(query, _), do: query
876 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
877 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
883 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
891 defp restrict_muted_reblogs(query, _), do: query
893 defp restrict_instance(query, %{instance: instance}) do
898 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
902 from(activity in query, where: activity.actor in ^users)
905 defp restrict_instance(query, _), do: query
907 defp restrict_filtered(query, %{user: %User{} = user}) do
908 case Filter.compose_regex(user) do
913 from([activity, object] in query,
915 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
916 activity.actor == ^user.ap_id
921 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
922 restrict_filtered(query, %{user: user})
925 defp restrict_filtered(query, _), do: query
927 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
929 defp exclude_poll_votes(query, _) do
930 if has_named_binding?(query, :object) do
931 from([activity, object: o] in query,
932 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
939 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
941 defp exclude_chat_messages(query, _) do
942 if has_named_binding?(query, :object) do
943 from([activity, object: o] in query,
944 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
951 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
953 defp exclude_invisible_actors(query, _opts) do
955 User.Query.build(%{invisible: true, select: [:ap_id]})
957 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
959 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
962 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
963 from(activity in query, where: activity.id != ^id)
966 defp exclude_id(query, _), do: query
968 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
970 defp maybe_preload_objects(query, _) do
972 |> Activity.with_preloaded_object()
975 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
977 defp maybe_preload_bookmarks(query, opts) do
979 |> Activity.with_preloaded_bookmark(opts[:user])
982 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
984 |> Activity.with_preloaded_report_notes()
987 defp maybe_preload_report_notes(query, _), do: query
989 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
991 defp maybe_set_thread_muted_field(query, opts) do
993 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
996 defp maybe_order(query, %{order: :desc}) do
998 |> order_by(desc: :id)
1001 defp maybe_order(query, %{order: :asc}) do
1003 |> order_by(asc: :id)
1006 defp maybe_order(query, _), do: query
1008 defp fetch_activities_query_ap_ids_ops(opts) do
1009 source_user = opts[:muting_user]
1010 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1012 ap_id_relationships =
1013 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1014 [:block | ap_id_relationships]
1019 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1021 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1022 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1024 restrict_muted_reblogs_opts =
1025 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1027 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1030 def fetch_activities_query(recipients, opts \\ %{}) do
1031 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1032 fetch_activities_query_ap_ids_ops(opts)
1035 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1039 |> maybe_preload_objects(opts)
1040 |> maybe_preload_bookmarks(opts)
1041 |> maybe_preload_report_notes(opts)
1042 |> maybe_set_thread_muted_field(opts)
1043 |> maybe_order(opts)
1044 |> restrict_recipients(recipients, opts[:user])
1045 |> restrict_replies(opts)
1046 |> restrict_tag(opts)
1047 |> restrict_tag_reject(opts)
1048 |> restrict_tag_all(opts)
1049 |> restrict_since(opts)
1050 |> restrict_local(opts)
1051 |> restrict_actor(opts)
1052 |> restrict_type(opts)
1053 |> restrict_state(opts)
1054 |> restrict_favorited_by(opts)
1055 |> restrict_blocked(restrict_blocked_opts)
1056 |> restrict_muted(restrict_muted_opts)
1057 |> restrict_filtered(opts)
1058 |> restrict_media(opts)
1059 |> restrict_visibility(opts)
1060 |> restrict_thread_visibility(opts, config)
1061 |> restrict_reblogs(opts)
1062 |> restrict_pinned(opts)
1063 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1064 |> restrict_instance(opts)
1065 |> restrict_announce_object_actor(opts)
1066 |> restrict_filtered(opts)
1067 |> Activity.restrict_deactivated_users()
1068 |> exclude_poll_votes(opts)
1069 |> exclude_chat_messages(opts)
1070 |> exclude_invisible_actors(opts)
1071 |> exclude_visibility(opts)
1074 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1075 list_memberships = Pleroma.List.memberships(opts[:user])
1077 fetch_activities_query(recipients ++ list_memberships, opts)
1078 |> Pagination.fetch_paginated(opts, pagination)
1080 |> maybe_update_cc(list_memberships, opts[:user])
1084 Fetch favorites activities of user with order by sort adds to favorites
1086 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1087 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1089 |> Activity.Queries.by_actor()
1090 |> Activity.Queries.by_type("Like")
1091 |> Activity.with_joined_object()
1092 |> Object.with_joined_activity()
1093 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1094 |> order_by([like, _, _], desc_nulls_last: like.id)
1095 |> Pagination.fetch_paginated(
1096 Map.merge(params, %{skip_order: true}),
1101 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1102 Enum.map(activities, fn
1103 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1104 if Enum.any?(bcc, &(&1 in list_memberships)) do
1105 update_in(activity.data["cc"], &[user_ap_id | &1])
1115 defp maybe_update_cc(activities, _, _), do: activities
1117 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1118 from(activity in query,
1120 fragment("? && ?", activity.recipients, ^recipients) or
1121 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1122 ^Constants.as_public() in activity.recipients)
1126 def fetch_activities_bounded(
1128 recipients_with_public,
1130 pagination \\ :keyset
1132 fetch_activities_query([], opts)
1133 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1134 |> Pagination.fetch_paginated(opts, pagination)
1138 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1139 def upload(file, opts \\ []) do
1140 with {:ok, data} <- Upload.store(file, opts) do
1141 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1143 Repo.insert(%Object{data: obj_data})
1147 @spec get_actor_url(any()) :: binary() | nil
1148 defp get_actor_url(url) when is_binary(url), do: url
1149 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1151 defp get_actor_url(url) when is_list(url) do
1157 defp get_actor_url(_url), do: nil
1159 defp object_to_user_data(data) do
1161 data["icon"]["url"] &&
1164 "url" => [%{"href" => data["icon"]["url"]}]
1168 data["image"]["url"] &&
1171 "url" => [%{"href" => data["image"]["url"]}]
1176 |> Map.get("attachment", [])
1177 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1178 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1182 |> Map.get("tag", [])
1184 %{"type" => "Emoji"} -> true
1187 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1188 {String.trim(name, ":"), url}
1191 locked = data["manuallyApprovesFollowers"] || false
1192 capabilities = data["capabilities"] || %{}
1193 accepts_chat_messages = capabilities["acceptsChatMessages"]
1194 data = Transmogrifier.maybe_fix_user_object(data)
1195 discoverable = data["discoverable"] || false
1196 invisible = data["invisible"] || false
1197 actor_type = data["type"] || "Person"
1200 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1201 data["publicKey"]["publicKeyPem"]
1207 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1208 data["endpoints"]["sharedInbox"]
1215 uri: get_actor_url(data["url"]),
1221 discoverable: discoverable,
1222 invisible: invisible,
1225 follower_address: data["followers"],
1226 following_address: data["following"],
1227 bio: data["summary"] || "",
1228 actor_type: actor_type,
1229 also_known_as: Map.get(data, "alsoKnownAs", []),
1230 public_key: public_key,
1231 inbox: data["inbox"],
1232 shared_inbox: shared_inbox,
1233 accepts_chat_messages: accepts_chat_messages
1236 # nickname can be nil because of virtual actors
1237 if data["preferredUsername"] do
1241 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1244 Map.put(user_data, :nickname, nil)
1248 def fetch_follow_information_for_user(user) do
1249 with {:ok, following_data} <-
1250 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1251 {:ok, hide_follows} <- collection_private(following_data),
1252 {:ok, followers_data} <-
1253 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1254 {:ok, hide_followers} <- collection_private(followers_data) do
1257 hide_follows: hide_follows,
1258 follower_count: normalize_counter(followers_data["totalItems"]),
1259 following_count: normalize_counter(following_data["totalItems"]),
1260 hide_followers: hide_followers
1263 {:error, _} = e -> e
1268 defp normalize_counter(counter) when is_integer(counter), do: counter
1269 defp normalize_counter(_), do: 0
1271 def maybe_update_follow_information(user_data) do
1272 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1273 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1275 {:collections_available,
1276 !!(user_data[:following_address] && user_data[:follower_address])},
1278 fetch_follow_information_for_user(user_data) do
1279 info = Map.merge(user_data[:info] || %{}, info)
1282 |> Map.put(:info, info)
1284 {:user_type_check, false} ->
1287 {:collections_available, false} ->
1290 {:enabled, false} ->
1295 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1302 defp collection_private(%{"first" => %{"type" => type}})
1303 when type in ["CollectionPage", "OrderedCollectionPage"],
1306 defp collection_private(%{"first" => first}) do
1307 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1308 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1311 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1312 {:error, _} = e -> e
1317 defp collection_private(_data), do: {:ok, true}
1319 def user_data_from_user_object(data) do
1320 with {:ok, data} <- MRF.filter(data) do
1321 {:ok, object_to_user_data(data)}
1327 def fetch_and_prepare_user_from_ap_id(ap_id) do
1328 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1329 {:ok, data} <- user_data_from_user_object(data) do
1330 {:ok, maybe_update_follow_information(data)}
1332 {:error, "Object has been deleted" = e} ->
1333 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1336 {:error, {:reject, reason} = e} ->
1337 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1341 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1346 def maybe_handle_clashing_nickname(data) do
1347 with nickname when is_binary(nickname) <- data[:nickname],
1348 %User{} = old_user <- User.get_by_nickname(nickname),
1349 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1351 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1357 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1358 |> User.update_and_set_cache()
1360 {:ap_id_comparison, true} ->
1362 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1370 def make_user_from_ap_id(ap_id) do
1371 user = User.get_cached_by_ap_id(ap_id)
1373 if user && !User.ap_enabled?(user) do
1374 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1376 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1379 |> User.remote_user_changeset(data)
1380 |> User.update_and_set_cache()
1382 maybe_handle_clashing_nickname(data)
1385 |> User.remote_user_changeset()
1393 def make_user_from_nickname(nickname) do
1394 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1395 make_user_from_ap_id(ap_id)
1397 _e -> {:error, "No AP id in WebFinger"}
1401 # filter out broken threads
1402 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1403 entire_thread_visible_for_user?(activity, user)
1406 # do post-processing on a specific activity
1407 def contain_activity(%Activity{} = activity, %User{} = user) do
1408 contain_broken_threads(activity, user)
1411 def fetch_direct_messages_query do
1413 |> restrict_type(%{type: "Create"})
1414 |> restrict_visibility(%{visibility: "direct"})
1415 |> order_by([activity], asc: activity.id)