1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
53 defp check_actor_is_active(nil), do: true
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
67 defp check_remote_limit(_), do: true
69 def increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
86 defp increase_replies_count_if_reply(_create_data), do: :noop
88 @object_types ["ChatMessage", "Question", "Answer"]
89 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
90 def persist(%{"type" => type} = object, meta) when type in @object_types do
91 with {:ok, object} <- Object.create(object) do
96 def persist(object, meta) do
97 with local <- Keyword.fetch!(meta, :local),
98 {recipients, _, _} <- get_recipients(object),
100 Repo.insert(%Activity{
103 recipients: recipients,
104 actor: object["actor"]
106 {:ok, activity, meta}
110 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
111 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
112 with nil <- Activity.normalize(map),
113 map <- lazy_put_activity_defaults(map, fake),
114 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
115 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
116 {:ok, map} <- MRF.filter(map),
117 {recipients, _, _} = get_recipients(map),
118 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
119 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
120 {:ok, map, object} <- insert_full_object(map) do
126 recipients: recipients
129 |> maybe_create_activity_expiration()
131 # Splice in the child object if we have one.
132 activity = Maps.put_if_present(activity, :object, object)
134 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
138 %Activity{} = activity ->
141 {:fake, true, map, recipients} ->
142 activity = %Activity{
146 recipients: recipients,
150 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 def notify_and_stream(activity) do
159 Notification.create_notifications(activity)
161 conversation = create_or_bump_conversation(activity, activity.actor)
162 participations = get_participations(conversation)
164 stream_out_participations(participations)
167 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
168 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
173 defp maybe_create_activity_expiration(result), do: result
175 defp create_or_bump_conversation(activity, actor) do
176 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
177 %User{} = user <- User.get_cached_by_ap_id(actor) do
178 Participation.mark_as_read(user, conversation)
183 defp get_participations({:ok, conversation}) do
185 |> Repo.preload(:participations, force: true)
186 |> Map.get(:participations)
189 defp get_participations(_), do: []
191 def stream_out_participations(participations) do
194 |> Repo.preload(:user)
196 Streamer.stream("participation", participations)
199 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
200 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
201 conversation = Repo.preload(conversation, :participations)
204 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
209 if last_activity_id do
210 stream_out_participations(conversation.participations)
215 def stream_out_participations(_, _), do: :noop
217 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
218 when data_type in ["Create", "Announce", "Delete"] do
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(activity)
224 def stream_out(_activity) do
228 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
229 def create(params, fake \\ false) do
230 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
235 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
236 additional = params[:additional] || %{}
237 # only accept false as false value
238 local = !(params[:local] == false)
239 published = params[:published]
240 quick_insert? = Config.get([:env]) == :benchmark
244 %{to: to, actor: actor, published: published, context: context, object: object},
248 with {:ok, activity} <- insert(create_data, local, fake),
249 {:fake, false, activity} <- {:fake, fake, activity},
250 _ <- increase_replies_count_if_reply(create_data),
251 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
252 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
253 _ <- notify_and_stream(activity),
254 :ok <- maybe_federate(activity) do
257 {:quick_insert, true, activity} ->
260 {:fake, true, activity} ->
264 Repo.rollback(message)
268 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
269 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(listen_data, local),
282 _ <- notify_and_stream(activity),
283 :ok <- maybe_federate(activity) do
288 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
289 def accept(params) do
290 accept_or_reject("Accept", params)
293 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
294 def reject(params) do
295 accept_or_reject("Reject", params)
298 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
299 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
300 local = Map.get(params, :local, true)
301 activity_id = Map.get(params, :activity_id, nil)
304 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
305 |> Maps.put_if_present("id", activity_id)
307 with {:ok, activity} <- insert(data, local),
308 _ <- notify_and_stream(activity),
309 :ok <- maybe_federate(activity) do
314 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
315 {:ok, Activity.t()} | nil | {:error, any()}
316 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
317 with {:ok, result} <-
318 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
323 defp do_unfollow(follower, followed, activity_id, local) do
324 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
325 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
326 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
327 {:ok, activity} <- insert(unfollow_data, local),
328 _ <- notify_and_stream(activity),
329 :ok <- maybe_federate(activity) do
333 {:error, error} -> Repo.rollback(error)
337 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
347 # only accept false as false value
348 local = !(params[:local] == false)
349 forward = !(params[:forward] == false)
351 additional = params[:additional] || %{}
355 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
357 Map.merge(additional, %{"to" => [], "cc" => []})
360 with flag_data <- make_flag_data(params, additional),
361 {:ok, activity} <- insert(flag_data, local),
362 {:ok, stripped_activity} <- strip_report_status_data(activity),
363 _ <- notify_and_stream(activity),
364 :ok <- maybe_federate(stripped_activity) do
365 User.all_superusers()
366 |> Enum.filter(fn user -> not is_nil(user.email) end)
367 |> Enum.each(fn superuser ->
369 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
370 |> Pleroma.Emails.Mailer.deliver_async()
377 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
378 def move(%User{} = origin, %User{} = target, local \\ true) do
381 "actor" => origin.ap_id,
382 "object" => origin.ap_id,
383 "target" => target.ap_id
386 with true <- origin.ap_id in target.also_known_as,
387 {:ok, activity} <- insert(params, local),
388 _ <- notify_and_stream(activity) do
389 maybe_federate(activity)
391 BackgroundWorker.enqueue("move_following", %{
392 "origin_id" => origin.id,
393 "target_id" => target.id
398 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
403 def fetch_activities_for_context_query(context, opts) do
404 public = [Constants.as_public()]
408 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
411 from(activity in Activity)
412 |> maybe_preload_objects(opts)
413 |> maybe_preload_bookmarks(opts)
414 |> maybe_set_thread_muted_field(opts)
415 |> restrict_blocked(opts)
416 |> restrict_recipients(recipients, opts[:user])
417 |> restrict_filtered(opts)
421 "?->>'type' = ? and ?->>'context' = ?",
428 |> exclude_poll_votes(opts)
430 |> order_by([activity], desc: activity.id)
433 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
434 def fetch_activities_for_context(context, opts \\ %{}) do
436 |> fetch_activities_for_context_query(opts)
440 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
441 FlakeId.Ecto.CompatType.t() | nil
442 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
444 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
445 |> restrict_visibility(%{visibility: "direct"})
451 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
452 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
453 opts = Map.delete(opts, :user)
455 [Constants.as_public()]
456 |> fetch_activities_query(opts)
457 |> restrict_unlisted(opts)
458 |> Pagination.fetch_paginated(opts, pagination)
461 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
462 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
464 |> Map.put(:restrict_unlisted, true)
465 |> fetch_public_or_unlisted_activities(pagination)
468 @valid_visibilities ~w[direct unlisted public private]
470 defp restrict_visibility(query, %{visibility: visibility})
471 when is_list(visibility) do
472 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
477 "activity_visibility(?, ?, ?) = ANY (?)",
485 Logger.error("Could not restrict visibility to #{visibility}")
489 defp restrict_visibility(query, %{visibility: visibility})
490 when visibility in @valid_visibilities do
494 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
498 defp restrict_visibility(_query, %{visibility: visibility})
499 when visibility not in @valid_visibilities do
500 Logger.error("Could not restrict visibility to #{visibility}")
503 defp restrict_visibility(query, _visibility), do: query
505 defp exclude_visibility(query, %{exclude_visibilities: visibility})
506 when is_list(visibility) do
507 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
512 "activity_visibility(?, ?, ?) = ANY (?)",
520 Logger.error("Could not exclude visibility to #{visibility}")
525 defp exclude_visibility(query, %{exclude_visibilities: visibility})
526 when visibility in @valid_visibilities do
531 "activity_visibility(?, ?, ?) = ?",
540 defp exclude_visibility(query, %{exclude_visibilities: visibility})
541 when visibility not in [nil | @valid_visibilities] do
542 Logger.error("Could not exclude visibility to #{visibility}")
546 defp exclude_visibility(query, _visibility), do: query
548 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
551 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
554 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
557 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
561 defp restrict_thread_visibility(query, _, _), do: query
563 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
566 |> Map.put(:user, reading_user)
567 |> Map.put(:actor_id, user.ap_id)
570 godmode: params[:godmode],
571 reading_user: reading_user
573 |> user_activities_recipients()
574 |> fetch_activities(params)
578 def fetch_user_activities(user, reading_user, params \\ %{}) do
581 |> Map.put(:type, ["Create", "Announce"])
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584 |> Map.put(:pinned_activity_ids, user.pinned_activities)
587 if User.blocks?(reading_user, user) do
591 |> Map.put(:blocking_user, reading_user)
592 |> Map.put(:muting_user, reading_user)
596 godmode: params[:godmode],
597 reading_user: reading_user
599 |> user_activities_recipients()
600 |> fetch_activities(params)
604 def fetch_statuses(reading_user, params) do
605 params = Map.put(params, :type, ["Create", "Announce"])
608 godmode: params[:godmode],
609 reading_user: reading_user
611 |> user_activities_recipients()
612 |> fetch_activities(params, :offset)
616 defp user_activities_recipients(%{godmode: true}), do: []
618 defp user_activities_recipients(%{reading_user: reading_user}) do
620 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
622 [Constants.as_public()]
626 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
627 raise "Can't use the child object without preloading!"
630 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
632 [activity, object] in query,
635 "?->>'type' != ? or ?->>'actor' != ?",
644 defp restrict_announce_object_actor(query, _), do: query
646 defp restrict_since(query, %{since_id: ""}), do: query
648 defp restrict_since(query, %{since_id: since_id}) do
649 from(activity in query, where: activity.id > ^since_id)
652 defp restrict_since(query, _), do: query
654 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
655 raise "Can't use the child object without preloading!"
658 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
660 [_activity, object] in query,
661 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
665 defp restrict_tag_reject(query, _), do: query
667 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
668 raise "Can't use the child object without preloading!"
671 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
673 [_activity, object] in query,
674 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
678 defp restrict_tag_all(query, _), do: query
680 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
681 raise "Can't use the child object without preloading!"
684 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
686 [_activity, object] in query,
687 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
691 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
693 [_activity, object] in query,
694 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
698 defp restrict_tag(query, _), do: query
700 defp restrict_recipients(query, [], _user), do: query
702 defp restrict_recipients(query, recipients, nil) do
703 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
706 defp restrict_recipients(query, recipients, user) do
709 where: fragment("? && ?", ^recipients, activity.recipients),
710 or_where: activity.actor == ^user.ap_id
714 defp restrict_local(query, %{local_only: true}) do
715 from(activity in query, where: activity.local == true)
718 defp restrict_local(query, _), do: query
720 defp restrict_actor(query, %{actor_id: actor_id}) do
721 from(activity in query, where: activity.actor == ^actor_id)
724 defp restrict_actor(query, _), do: query
726 defp restrict_type(query, %{type: type}) when is_binary(type) do
727 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
730 defp restrict_type(query, %{type: type}) do
731 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
734 defp restrict_type(query, _), do: query
736 defp restrict_state(query, %{state: state}) do
737 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
740 defp restrict_state(query, _), do: query
742 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
744 [_activity, object] in query,
745 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
749 defp restrict_favorited_by(query, _), do: query
751 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
752 raise "Can't use the child object without preloading!"
755 defp restrict_media(query, %{only_media: true}) do
757 [activity, object] in query,
758 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
759 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
763 defp restrict_media(query, _), do: query
765 defp restrict_replies(query, %{exclude_replies: true}) do
767 [_activity, object] in query,
768 where: fragment("?->>'inReplyTo' is null", object.data)
772 defp restrict_replies(query, %{
773 reply_filtering_user: user,
774 reply_visibility: "self"
777 [activity, object] in query,
780 "?->>'inReplyTo' is null OR ? = ANY(?)",
788 defp restrict_replies(query, %{
789 reply_filtering_user: user,
790 reply_visibility: "following"
793 [activity, object] in query,
796 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
798 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
807 defp restrict_replies(query, _), do: query
809 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
810 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
813 defp restrict_reblogs(query, _), do: query
815 defp restrict_muted(query, %{with_muted: true}), do: query
817 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
818 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
821 from([activity] in query,
822 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
823 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
826 unless opts[:skip_preload] do
827 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
833 defp restrict_muted(query, _), do: query
835 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
836 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
837 domain_blocks = user.domain_blocks || []
839 following_ap_ids = User.get_friends_ap_ids(user)
842 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
845 [activity, object: o] in query,
846 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
847 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
850 "recipients_contain_blocked_domains(?, ?) = false",
856 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
863 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
871 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
880 defp restrict_blocked(query, _), do: query
882 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
887 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
889 ^[Constants.as_public()]
894 defp restrict_unlisted(query, _), do: query
896 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
897 from(activity in query, where: activity.id in ^ids)
900 defp restrict_pinned(query, _), do: query
902 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
903 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
909 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
917 defp restrict_muted_reblogs(query, _), do: query
919 defp restrict_instance(query, %{instance: instance}) do
924 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
928 from(activity in query, where: activity.actor in ^users)
931 defp restrict_instance(query, _), do: query
933 defp restrict_filtered(query, %{user: %User{} = user}) do
934 case Filter.compose_regex(user) do
939 from([activity, object] in query,
941 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
942 activity.actor == ^user.ap_id
947 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
948 restrict_filtered(query, %{user: user})
951 defp restrict_filtered(query, _), do: query
953 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
955 defp exclude_poll_votes(query, _) do
956 if has_named_binding?(query, :object) do
957 from([activity, object: o] in query,
958 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
965 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
967 defp exclude_chat_messages(query, _) do
968 if has_named_binding?(query, :object) do
969 from([activity, object: o] in query,
970 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
977 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
979 defp exclude_invisible_actors(query, _opts) do
981 User.Query.build(%{invisible: true, select: [:ap_id]})
983 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
985 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
988 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
989 from(activity in query, where: activity.id != ^id)
992 defp exclude_id(query, _), do: query
994 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
996 defp maybe_preload_objects(query, _) do
998 |> Activity.with_preloaded_object()
1001 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1003 defp maybe_preload_bookmarks(query, opts) do
1005 |> Activity.with_preloaded_bookmark(opts[:user])
1008 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1010 |> Activity.with_preloaded_report_notes()
1013 defp maybe_preload_report_notes(query, _), do: query
1015 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1017 defp maybe_set_thread_muted_field(query, opts) do
1019 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1022 defp maybe_order(query, %{order: :desc}) do
1024 |> order_by(desc: :id)
1027 defp maybe_order(query, %{order: :asc}) do
1029 |> order_by(asc: :id)
1032 defp maybe_order(query, _), do: query
1034 defp fetch_activities_query_ap_ids_ops(opts) do
1035 source_user = opts[:muting_user]
1036 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1038 ap_id_relationships =
1039 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1040 [:block | ap_id_relationships]
1045 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1047 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1048 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1050 restrict_muted_reblogs_opts =
1051 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1053 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1056 def fetch_activities_query(recipients, opts \\ %{}) do
1057 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1058 fetch_activities_query_ap_ids_ops(opts)
1061 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1065 |> maybe_preload_objects(opts)
1066 |> maybe_preload_bookmarks(opts)
1067 |> maybe_preload_report_notes(opts)
1068 |> maybe_set_thread_muted_field(opts)
1069 |> maybe_order(opts)
1070 |> restrict_recipients(recipients, opts[:user])
1071 |> restrict_replies(opts)
1072 |> restrict_tag(opts)
1073 |> restrict_tag_reject(opts)
1074 |> restrict_tag_all(opts)
1075 |> restrict_since(opts)
1076 |> restrict_local(opts)
1077 |> restrict_actor(opts)
1078 |> restrict_type(opts)
1079 |> restrict_state(opts)
1080 |> restrict_favorited_by(opts)
1081 |> restrict_blocked(restrict_blocked_opts)
1082 |> restrict_muted(restrict_muted_opts)
1083 |> restrict_filtered(opts)
1084 |> restrict_media(opts)
1085 |> restrict_visibility(opts)
1086 |> restrict_thread_visibility(opts, config)
1087 |> restrict_reblogs(opts)
1088 |> restrict_pinned(opts)
1089 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1090 |> restrict_instance(opts)
1091 |> restrict_announce_object_actor(opts)
1092 |> restrict_filtered(opts)
1093 |> Activity.restrict_deactivated_users()
1094 |> exclude_poll_votes(opts)
1095 |> exclude_chat_messages(opts)
1096 |> exclude_invisible_actors(opts)
1097 |> exclude_visibility(opts)
1100 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1101 list_memberships = Pleroma.List.memberships(opts[:user])
1103 fetch_activities_query(recipients ++ list_memberships, opts)
1104 |> Pagination.fetch_paginated(opts, pagination)
1106 |> maybe_update_cc(list_memberships, opts[:user])
1110 Fetch favorites activities of user with order by sort adds to favorites
1112 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1113 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1115 |> Activity.Queries.by_actor()
1116 |> Activity.Queries.by_type("Like")
1117 |> Activity.with_joined_object()
1118 |> Object.with_joined_activity()
1119 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1120 |> order_by([like, _, _], desc_nulls_last: like.id)
1121 |> Pagination.fetch_paginated(
1122 Map.merge(params, %{skip_order: true}),
1127 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1128 Enum.map(activities, fn
1129 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1130 if Enum.any?(bcc, &(&1 in list_memberships)) do
1131 update_in(activity.data["cc"], &[user_ap_id | &1])
1141 defp maybe_update_cc(activities, _, _), do: activities
1143 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1144 from(activity in query,
1146 fragment("? && ?", activity.recipients, ^recipients) or
1147 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1148 ^Constants.as_public() in activity.recipients)
1152 def fetch_activities_bounded(
1154 recipients_with_public,
1156 pagination \\ :keyset
1158 fetch_activities_query([], opts)
1159 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1160 |> Pagination.fetch_paginated(opts, pagination)
1164 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1165 def upload(file, opts \\ []) do
1166 with {:ok, data} <- Upload.store(file, opts) do
1167 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1169 Repo.insert(%Object{data: obj_data})
1173 @spec get_actor_url(any()) :: binary() | nil
1174 defp get_actor_url(url) when is_binary(url), do: url
1175 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1177 defp get_actor_url(url) when is_list(url) do
1183 defp get_actor_url(_url), do: nil
1185 defp object_to_user_data(data) do
1187 data["icon"]["url"] &&
1190 "url" => [%{"href" => data["icon"]["url"]}]
1194 data["image"]["url"] &&
1197 "url" => [%{"href" => data["image"]["url"]}]
1202 |> Map.get("attachment", [])
1203 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1204 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1208 |> Map.get("tag", [])
1210 %{"type" => "Emoji"} -> true
1213 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1214 {String.trim(name, ":"), url}
1217 locked = data["manuallyApprovesFollowers"] || false
1218 capabilities = data["capabilities"] || %{}
1219 accepts_chat_messages = capabilities["acceptsChatMessages"]
1220 data = Transmogrifier.maybe_fix_user_object(data)
1221 discoverable = data["discoverable"] || false
1222 invisible = data["invisible"] || false
1223 actor_type = data["type"] || "Person"
1226 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1227 data["publicKey"]["publicKeyPem"]
1233 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1234 data["endpoints"]["sharedInbox"]
1241 uri: get_actor_url(data["url"]),
1247 discoverable: discoverable,
1248 invisible: invisible,
1251 follower_address: data["followers"],
1252 following_address: data["following"],
1253 bio: data["summary"],
1254 actor_type: actor_type,
1255 also_known_as: Map.get(data, "alsoKnownAs", []),
1256 public_key: public_key,
1257 inbox: data["inbox"],
1258 shared_inbox: shared_inbox,
1259 accepts_chat_messages: accepts_chat_messages
1262 # nickname can be nil because of virtual actors
1263 if data["preferredUsername"] do
1267 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1270 Map.put(user_data, :nickname, nil)
1274 def fetch_follow_information_for_user(user) do
1275 with {:ok, following_data} <-
1276 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1277 {:ok, hide_follows} <- collection_private(following_data),
1278 {:ok, followers_data} <-
1279 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1280 {:ok, hide_followers} <- collection_private(followers_data) do
1283 hide_follows: hide_follows,
1284 follower_count: normalize_counter(followers_data["totalItems"]),
1285 following_count: normalize_counter(following_data["totalItems"]),
1286 hide_followers: hide_followers
1289 {:error, _} = e -> e
1294 defp normalize_counter(counter) when is_integer(counter), do: counter
1295 defp normalize_counter(_), do: 0
1297 def maybe_update_follow_information(user_data) do
1298 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1299 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1301 {:collections_available,
1302 !!(user_data[:following_address] && user_data[:follower_address])},
1304 fetch_follow_information_for_user(user_data) do
1305 info = Map.merge(user_data[:info] || %{}, info)
1308 |> Map.put(:info, info)
1310 {:user_type_check, false} ->
1313 {:collections_available, false} ->
1316 {:enabled, false} ->
1321 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1328 defp collection_private(%{"first" => %{"type" => type}})
1329 when type in ["CollectionPage", "OrderedCollectionPage"],
1332 defp collection_private(%{"first" => first}) do
1333 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1334 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1337 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1338 {:error, _} = e -> e
1343 defp collection_private(_data), do: {:ok, true}
1345 def user_data_from_user_object(data) do
1346 with {:ok, data} <- MRF.filter(data) do
1347 {:ok, object_to_user_data(data)}
1353 def fetch_and_prepare_user_from_ap_id(ap_id) do
1354 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1355 {:ok, data} <- user_data_from_user_object(data) do
1356 {:ok, maybe_update_follow_information(data)}
1358 {:error, "Object has been deleted" = e} ->
1359 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1362 {:error, {:reject, reason} = e} ->
1363 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1367 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1372 def maybe_handle_clashing_nickname(data) do
1373 nickname = data[:nickname]
1375 with %User{} = old_user <- User.get_by_nickname(nickname),
1376 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1378 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1384 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1385 |> User.update_and_set_cache()
1387 {:ap_id_comparison, true} ->
1389 "Found an old user for #{nickname}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1397 def make_user_from_ap_id(ap_id) do
1398 user = User.get_cached_by_ap_id(ap_id)
1400 if user && !User.ap_enabled?(user) do
1401 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1403 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1406 |> User.remote_user_changeset(data)
1407 |> User.update_and_set_cache()
1409 maybe_handle_clashing_nickname(data)
1412 |> User.remote_user_changeset()
1420 def make_user_from_nickname(nickname) do
1421 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1422 make_user_from_ap_id(ap_id)
1424 _e -> {:error, "No AP id in WebFinger"}
1428 # filter out broken threads
1429 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1430 entire_thread_visible_for_user?(activity, user)
1433 # do post-processing on a specific activity
1434 def contain_activity(%Activity{} = activity, %User{} = user) do
1435 contain_broken_threads(activity, user)
1438 def fetch_direct_messages_query do
1440 |> restrict_type(%{type: "Create"})
1441 |> restrict_visibility(%{visibility: "direct"})
1442 |> order_by([activity], asc: activity.id)