1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Event]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 {:ok, activity, meta}
109 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
110 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
111 with nil <- Activity.normalize(map),
112 map <- lazy_put_activity_defaults(map, fake),
113 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
114 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
115 {:ok, map} <- MRF.filter(map),
116 {recipients, _, _} = get_recipients(map),
117 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
118 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
119 {:ok, map, object} <- insert_full_object(map),
120 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
121 # Splice in the child object if we have one.
122 activity = Maps.put_if_present(activity, :object, object)
124 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
128 %Activity{} = activity ->
134 {:containment, _} = error ->
137 {:error, _} = error ->
140 {:fake, true, map, recipients} ->
141 activity = %Activity{
145 recipients: recipients,
149 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
152 {:remote_limit_pass, _} ->
153 {:error, :remote_limit}
160 defp insert_activity_with_expiration(data, local, recipients) do
164 actor: data["actor"],
165 recipients: recipients
168 |> maybe_create_activity_expiration()
171 def notify_and_stream(activity) do
172 Notification.create_notifications(activity)
174 conversation = create_or_bump_conversation(activity, activity.actor)
175 participations = get_participations(conversation)
177 stream_out_participations(participations)
180 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
182 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
183 activity_id: activity.id,
184 expires_at: expires_at
190 defp maybe_create_activity_expiration(result), do: result
192 defp create_or_bump_conversation(activity, actor) do
193 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
194 %User{} = user <- User.get_cached_by_ap_id(actor) do
195 Participation.mark_as_read(user, conversation)
200 defp get_participations({:ok, conversation}) do
202 |> Repo.preload(:participations, force: true)
203 |> Map.get(:participations)
206 defp get_participations(_), do: []
208 def stream_out_participations(participations) do
211 |> Repo.preload(:user)
213 Streamer.stream("participation", participations)
216 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
217 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
218 conversation = Repo.preload(conversation, :participations)
221 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
226 if last_activity_id do
227 stream_out_participations(conversation.participations)
232 def stream_out_participations(_, _), do: :noop
234 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
235 when data_type in ["Create", "Announce", "Delete"] do
237 |> Topics.get_activity_topics()
238 |> Streamer.stream(activity)
241 def stream_out(_activity) do
245 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
246 def create(params, fake \\ false) do
247 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
252 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257 quick_insert? = Config.get([:env]) == :benchmark
261 %{to: to, actor: actor, published: published, context: context, object: object},
265 with {:ok, activity} <- insert(create_data, local, fake),
266 {:fake, false, activity} <- {:fake, fake, activity},
267 _ <- increase_replies_count_if_reply(create_data),
268 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
269 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
270 _ <- notify_and_stream(activity),
271 :ok <- maybe_federate(activity) do
274 {:quick_insert, true, activity} ->
277 {:fake, true, activity} ->
281 Repo.rollback(message)
285 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
286 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
287 additional = params[:additional] || %{}
288 # only accept false as false value
289 local = !(params[:local] == false)
290 published = params[:published]
294 %{to: to, actor: actor, published: published, context: context, object: object},
298 with {:ok, activity} <- insert(listen_data, local),
299 _ <- notify_and_stream(activity),
300 :ok <- maybe_federate(activity) do
305 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
306 {:ok, Activity.t()} | nil | {:error, any()}
307 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
308 with {:ok, result} <-
309 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
314 defp do_unfollow(follower, followed, activity_id, local) do
315 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
316 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
317 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
318 {:ok, activity} <- insert(unfollow_data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
324 {:error, error} -> Repo.rollback(error)
328 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
338 # only accept false as false value
339 local = !(params[:local] == false)
340 forward = !(params[:forward] == false)
342 additional = params[:additional] || %{}
346 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
348 Map.merge(additional, %{"to" => [], "cc" => []})
351 with flag_data <- make_flag_data(params, additional),
352 {:ok, activity} <- insert(flag_data, local),
353 {:ok, stripped_activity} <- strip_report_status_data(activity),
354 _ <- notify_and_stream(activity),
355 :ok <- maybe_federate(stripped_activity) do
356 User.all_superusers()
357 |> Enum.filter(fn user -> not is_nil(user.email) end)
358 |> Enum.each(fn superuser ->
360 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
361 |> Pleroma.Emails.Mailer.deliver_async()
368 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
369 def move(%User{} = origin, %User{} = target, local \\ true) do
372 "actor" => origin.ap_id,
373 "object" => origin.ap_id,
374 "target" => target.ap_id
377 with true <- origin.ap_id in target.also_known_as,
378 {:ok, activity} <- insert(params, local),
379 _ <- notify_and_stream(activity) do
380 maybe_federate(activity)
382 BackgroundWorker.enqueue("move_following", %{
383 "origin_id" => origin.id,
384 "target_id" => target.id
389 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
394 def fetch_activities_for_context_query(context, opts) do
395 public = [Constants.as_public()]
399 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
402 from(activity in Activity)
403 |> maybe_preload_objects(opts)
404 |> maybe_preload_bookmarks(opts)
405 |> maybe_set_thread_muted_field(opts)
406 |> restrict_blocked(opts)
407 |> restrict_recipients(recipients, opts[:user])
408 |> restrict_filtered(opts)
412 "?->>'type' = ? and ?->>'context' = ?",
419 |> exclude_poll_votes(opts)
421 |> order_by([activity], desc: activity.id)
424 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
425 def fetch_activities_for_context(context, opts \\ %{}) do
427 |> fetch_activities_for_context_query(opts)
431 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
432 FlakeId.Ecto.CompatType.t() | nil
433 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
435 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
436 |> restrict_visibility(%{visibility: "direct"})
442 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
443 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
444 opts = Map.delete(opts, :user)
446 [Constants.as_public()]
447 |> fetch_activities_query(opts)
448 |> restrict_unlisted(opts)
449 |> Pagination.fetch_paginated(opts, pagination)
452 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
453 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
455 |> Map.put(:restrict_unlisted, true)
456 |> fetch_public_or_unlisted_activities(pagination)
459 @valid_visibilities ~w[direct unlisted public private]
461 defp restrict_visibility(query, %{visibility: visibility})
462 when is_list(visibility) do
463 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
468 "activity_visibility(?, ?, ?) = ANY (?)",
476 Logger.error("Could not restrict visibility to #{visibility}")
480 defp restrict_visibility(query, %{visibility: visibility})
481 when visibility in @valid_visibilities do
485 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
489 defp restrict_visibility(_query, %{visibility: visibility})
490 when visibility not in @valid_visibilities do
491 Logger.error("Could not restrict visibility to #{visibility}")
494 defp restrict_visibility(query, _visibility), do: query
496 defp exclude_visibility(query, %{exclude_visibilities: visibility})
497 when is_list(visibility) do
498 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
503 "activity_visibility(?, ?, ?) = ANY (?)",
511 Logger.error("Could not exclude visibility to #{visibility}")
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when visibility in @valid_visibilities do
522 "activity_visibility(?, ?, ?) = ?",
531 defp exclude_visibility(query, %{exclude_visibilities: visibility})
532 when visibility not in [nil | @valid_visibilities] do
533 Logger.error("Could not exclude visibility to #{visibility}")
537 defp exclude_visibility(query, _visibility), do: query
539 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
542 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
545 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
548 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
552 defp restrict_thread_visibility(query, _, _), do: query
554 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
557 |> Map.put(:user, reading_user)
558 |> Map.put(:actor_id, user.ap_id)
561 godmode: params[:godmode],
562 reading_user: reading_user
564 |> user_activities_recipients()
565 |> fetch_activities(params)
569 def fetch_user_activities(user, reading_user, params \\ %{}) do
572 |> Map.put(:type, ["Create", "Announce"])
573 |> Map.put(:user, reading_user)
574 |> Map.put(:actor_id, user.ap_id)
575 |> Map.put(:pinned_activity_ids, user.pinned_activities)
578 if User.blocks?(reading_user, user) do
582 |> Map.put(:blocking_user, reading_user)
583 |> Map.put(:muting_user, reading_user)
587 godmode: params[:godmode],
588 reading_user: reading_user
590 |> user_activities_recipients()
591 |> fetch_activities(params)
595 def fetch_statuses(reading_user, params) do
596 params = Map.put(params, :type, ["Create", "Announce"])
599 godmode: params[:godmode],
600 reading_user: reading_user
602 |> user_activities_recipients()
603 |> fetch_activities(params, :offset)
607 defp user_activities_recipients(%{godmode: true}), do: []
609 defp user_activities_recipients(%{reading_user: reading_user}) do
611 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
613 [Constants.as_public()]
617 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
618 raise "Can't use the child object without preloading!"
621 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
623 [activity, object] in query,
626 "?->>'type' != ? or ?->>'actor' != ?",
635 defp restrict_announce_object_actor(query, _), do: query
637 defp restrict_since(query, %{since_id: ""}), do: query
639 defp restrict_since(query, %{since_id: since_id}) do
640 from(activity in query, where: activity.id > ^since_id)
643 defp restrict_since(query, _), do: query
645 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
646 raise "Can't use the child object without preloading!"
649 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
651 [_activity, object] in query,
652 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
656 defp restrict_tag_reject(query, _), do: query
658 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
659 raise "Can't use the child object without preloading!"
662 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
664 [_activity, object] in query,
665 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
669 defp restrict_tag_all(query, _), do: query
671 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
672 raise "Can't use the child object without preloading!"
675 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
677 [_activity, object] in query,
678 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
682 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
689 defp restrict_tag(query, _), do: query
691 defp restrict_recipients(query, [], _user), do: query
693 defp restrict_recipients(query, recipients, nil) do
694 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
697 defp restrict_recipients(query, recipients, user) do
700 where: fragment("? && ?", ^recipients, activity.recipients),
701 or_where: activity.actor == ^user.ap_id
705 defp restrict_local(query, %{local_only: true}) do
706 from(activity in query, where: activity.local == true)
709 defp restrict_local(query, _), do: query
711 defp restrict_actor(query, %{actor_id: actor_id}) do
712 from(activity in query, where: activity.actor == ^actor_id)
715 defp restrict_actor(query, _), do: query
717 defp restrict_type(query, %{type: type}) when is_binary(type) do
718 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
721 defp restrict_type(query, %{type: type}) do
722 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
725 defp restrict_type(query, _), do: query
727 defp restrict_state(query, %{state: state}) do
728 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
731 defp restrict_state(query, _), do: query
733 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
735 [_activity, object] in query,
736 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
740 defp restrict_favorited_by(query, _), do: query
742 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
743 raise "Can't use the child object without preloading!"
746 defp restrict_media(query, %{only_media: true}) do
748 [activity, object] in query,
749 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
750 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
754 defp restrict_media(query, _), do: query
756 defp restrict_replies(query, %{exclude_replies: true}) do
758 [_activity, object] in query,
759 where: fragment("?->>'inReplyTo' is null", object.data)
763 defp restrict_replies(query, %{
764 reply_filtering_user: user,
765 reply_visibility: "self"
768 [activity, object] in query,
771 "?->>'inReplyTo' is null OR ? = ANY(?)",
779 defp restrict_replies(query, %{
780 reply_filtering_user: user,
781 reply_visibility: "following"
784 [activity, object] in query,
787 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
789 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
798 defp restrict_replies(query, _), do: query
800 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
801 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
804 defp restrict_reblogs(query, _), do: query
806 defp restrict_muted(query, %{with_muted: true}), do: query
808 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
809 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
812 from([activity] in query,
813 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
814 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
817 unless opts[:skip_preload] do
818 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
824 defp restrict_muted(query, _), do: query
826 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
827 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
828 domain_blocks = user.domain_blocks || []
830 following_ap_ids = User.get_friends_ap_ids(user)
833 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
836 [activity, object: o] in query,
837 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
838 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
841 "recipients_contain_blocked_domains(?, ?) = false",
847 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
854 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
862 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
871 defp restrict_blocked(query, _), do: query
873 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
878 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
880 ^[Constants.as_public()]
885 defp restrict_unlisted(query, _), do: query
887 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
888 from(activity in query, where: activity.id in ^ids)
891 defp restrict_pinned(query, _), do: query
893 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
894 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
900 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
908 defp restrict_muted_reblogs(query, _), do: query
910 defp restrict_instance(query, %{instance: instance}) do
915 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
919 from(activity in query, where: activity.actor in ^users)
922 defp restrict_instance(query, _), do: query
924 defp restrict_filtered(query, %{user: %User{} = user}) do
925 case Filter.compose_regex(user) do
930 from([activity, object] in query,
932 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
933 activity.actor == ^user.ap_id
938 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
939 restrict_filtered(query, %{user: user})
942 defp restrict_filtered(query, _), do: query
944 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
946 defp exclude_poll_votes(query, _) do
947 if has_named_binding?(query, :object) do
948 from([activity, object: o] in query,
949 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
956 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
958 defp exclude_chat_messages(query, _) do
959 if has_named_binding?(query, :object) do
960 from([activity, object: o] in query,
961 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
968 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
970 defp exclude_invisible_actors(query, _opts) do
972 User.Query.build(%{invisible: true, select: [:ap_id]})
974 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
976 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
979 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
980 from(activity in query, where: activity.id != ^id)
983 defp exclude_id(query, _), do: query
985 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
987 defp maybe_preload_objects(query, _) do
989 |> Activity.with_preloaded_object()
992 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
994 defp maybe_preload_bookmarks(query, opts) do
996 |> Activity.with_preloaded_bookmark(opts[:user])
999 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1001 |> Activity.with_preloaded_report_notes()
1004 defp maybe_preload_report_notes(query, _), do: query
1006 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1008 defp maybe_set_thread_muted_field(query, opts) do
1010 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1013 defp maybe_order(query, %{order: :desc}) do
1015 |> order_by(desc: :id)
1018 defp maybe_order(query, %{order: :asc}) do
1020 |> order_by(asc: :id)
1023 defp maybe_order(query, _), do: query
1025 defp fetch_activities_query_ap_ids_ops(opts) do
1026 source_user = opts[:muting_user]
1027 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1029 ap_id_relationships =
1030 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1031 [:block | ap_id_relationships]
1036 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1038 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1039 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1041 restrict_muted_reblogs_opts =
1042 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1044 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1047 def fetch_activities_query(recipients, opts \\ %{}) do
1048 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1049 fetch_activities_query_ap_ids_ops(opts)
1052 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1056 |> maybe_preload_objects(opts)
1057 |> maybe_preload_bookmarks(opts)
1058 |> maybe_preload_report_notes(opts)
1059 |> maybe_set_thread_muted_field(opts)
1060 |> maybe_order(opts)
1061 |> restrict_recipients(recipients, opts[:user])
1062 |> restrict_replies(opts)
1063 |> restrict_tag(opts)
1064 |> restrict_tag_reject(opts)
1065 |> restrict_tag_all(opts)
1066 |> restrict_since(opts)
1067 |> restrict_local(opts)
1068 |> restrict_actor(opts)
1069 |> restrict_type(opts)
1070 |> restrict_state(opts)
1071 |> restrict_favorited_by(opts)
1072 |> restrict_blocked(restrict_blocked_opts)
1073 |> restrict_muted(restrict_muted_opts)
1074 |> restrict_filtered(opts)
1075 |> restrict_media(opts)
1076 |> restrict_visibility(opts)
1077 |> restrict_thread_visibility(opts, config)
1078 |> restrict_reblogs(opts)
1079 |> restrict_pinned(opts)
1080 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1081 |> restrict_instance(opts)
1082 |> restrict_announce_object_actor(opts)
1083 |> restrict_filtered(opts)
1084 |> Activity.restrict_deactivated_users()
1085 |> exclude_poll_votes(opts)
1086 |> exclude_chat_messages(opts)
1087 |> exclude_invisible_actors(opts)
1088 |> exclude_visibility(opts)
1091 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1092 list_memberships = Pleroma.List.memberships(opts[:user])
1094 fetch_activities_query(recipients ++ list_memberships, opts)
1095 |> Pagination.fetch_paginated(opts, pagination)
1097 |> maybe_update_cc(list_memberships, opts[:user])
1101 Fetch favorites activities of user with order by sort adds to favorites
1103 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1104 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1106 |> Activity.Queries.by_actor()
1107 |> Activity.Queries.by_type("Like")
1108 |> Activity.with_joined_object()
1109 |> Object.with_joined_activity()
1110 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1111 |> order_by([like, _, _], desc_nulls_last: like.id)
1112 |> Pagination.fetch_paginated(
1113 Map.merge(params, %{skip_order: true}),
1118 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1119 Enum.map(activities, fn
1120 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1121 if Enum.any?(bcc, &(&1 in list_memberships)) do
1122 update_in(activity.data["cc"], &[user_ap_id | &1])
1132 defp maybe_update_cc(activities, _, _), do: activities
1134 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1135 from(activity in query,
1137 fragment("? && ?", activity.recipients, ^recipients) or
1138 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1139 ^Constants.as_public() in activity.recipients)
1143 def fetch_activities_bounded(
1145 recipients_with_public,
1147 pagination \\ :keyset
1149 fetch_activities_query([], opts)
1150 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1151 |> Pagination.fetch_paginated(opts, pagination)
1155 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1156 def upload(file, opts \\ []) do
1157 with {:ok, data} <- Upload.store(file, opts) do
1158 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1160 Repo.insert(%Object{data: obj_data})
1164 @spec get_actor_url(any()) :: binary() | nil
1165 defp get_actor_url(url) when is_binary(url), do: url
1166 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1168 defp get_actor_url(url) when is_list(url) do
1174 defp get_actor_url(_url), do: nil
1176 defp object_to_user_data(data) do
1178 data["icon"]["url"] &&
1181 "url" => [%{"href" => data["icon"]["url"]}]
1185 data["image"]["url"] &&
1188 "url" => [%{"href" => data["image"]["url"]}]
1193 |> Map.get("attachment", [])
1194 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1195 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1199 |> Map.get("tag", [])
1201 %{"type" => "Emoji"} -> true
1204 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1205 {String.trim(name, ":"), url}
1208 locked = data["manuallyApprovesFollowers"] || false
1209 capabilities = data["capabilities"] || %{}
1210 accepts_chat_messages = capabilities["acceptsChatMessages"]
1211 data = Transmogrifier.maybe_fix_user_object(data)
1212 discoverable = data["discoverable"] || false
1213 invisible = data["invisible"] || false
1214 actor_type = data["type"] || "Person"
1217 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1218 data["publicKey"]["publicKeyPem"]
1224 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1225 data["endpoints"]["sharedInbox"]
1232 uri: get_actor_url(data["url"]),
1238 discoverable: discoverable,
1239 invisible: invisible,
1242 follower_address: data["followers"],
1243 following_address: data["following"],
1244 bio: data["summary"] || "",
1245 actor_type: actor_type,
1246 also_known_as: Map.get(data, "alsoKnownAs", []),
1247 public_key: public_key,
1248 inbox: data["inbox"],
1249 shared_inbox: shared_inbox,
1250 accepts_chat_messages: accepts_chat_messages
1253 # nickname can be nil because of virtual actors
1254 if data["preferredUsername"] do
1258 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1261 Map.put(user_data, :nickname, nil)
1265 def fetch_follow_information_for_user(user) do
1266 with {:ok, following_data} <-
1267 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1268 {:ok, hide_follows} <- collection_private(following_data),
1269 {:ok, followers_data} <-
1270 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1271 {:ok, hide_followers} <- collection_private(followers_data) do
1274 hide_follows: hide_follows,
1275 follower_count: normalize_counter(followers_data["totalItems"]),
1276 following_count: normalize_counter(following_data["totalItems"]),
1277 hide_followers: hide_followers
1280 {:error, _} = e -> e
1285 defp normalize_counter(counter) when is_integer(counter), do: counter
1286 defp normalize_counter(_), do: 0
1288 def maybe_update_follow_information(user_data) do
1289 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1290 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1292 {:collections_available,
1293 !!(user_data[:following_address] && user_data[:follower_address])},
1295 fetch_follow_information_for_user(user_data) do
1296 info = Map.merge(user_data[:info] || %{}, info)
1299 |> Map.put(:info, info)
1301 {:user_type_check, false} ->
1304 {:collections_available, false} ->
1307 {:enabled, false} ->
1312 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1319 defp collection_private(%{"first" => %{"type" => type}})
1320 when type in ["CollectionPage", "OrderedCollectionPage"],
1323 defp collection_private(%{"first" => first}) do
1324 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1325 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1328 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1329 {:error, _} = e -> e
1334 defp collection_private(_data), do: {:ok, true}
1336 def user_data_from_user_object(data) do
1337 with {:ok, data} <- MRF.filter(data) do
1338 {:ok, object_to_user_data(data)}
1344 def fetch_and_prepare_user_from_ap_id(ap_id) do
1345 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1346 {:ok, data} <- user_data_from_user_object(data) do
1347 {:ok, maybe_update_follow_information(data)}
1349 {:error, "Object has been deleted" = e} ->
1350 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1353 {:error, {:reject, reason} = e} ->
1354 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1358 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1363 def maybe_handle_clashing_nickname(data) do
1364 with nickname when is_binary(nickname) <- data[:nickname],
1365 %User{} = old_user <- User.get_by_nickname(nickname),
1366 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1368 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1374 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1375 |> User.update_and_set_cache()
1377 {:ap_id_comparison, true} ->
1379 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1387 def make_user_from_ap_id(ap_id) do
1388 user = User.get_cached_by_ap_id(ap_id)
1390 if user && !User.ap_enabled?(user) do
1391 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1393 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1396 |> User.remote_user_changeset(data)
1397 |> User.update_and_set_cache()
1399 maybe_handle_clashing_nickname(data)
1402 |> User.remote_user_changeset()
1410 def make_user_from_nickname(nickname) do
1411 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1412 make_user_from_ap_id(ap_id)
1414 _e -> {:error, "No AP id in WebFinger"}
1418 # filter out broken threads
1419 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1420 entire_thread_visible_for_user?(activity, user)
1423 # do post-processing on a specific activity
1424 def contain_activity(%Activity{} = activity, %User{} = user) do
1425 contain_broken_threads(activity, user)
1428 def fetch_direct_messages_query do
1430 |> restrict_type(%{type: "Create"})
1431 |> restrict_visibility(%{visibility: "direct"})
1432 |> order_by([activity], asc: activity.id)