1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
53 defp check_actor_is_active(nil), do: true
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
67 defp check_remote_limit(_), do: true
69 def increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
86 defp increase_replies_count_if_reply(_create_data), do: :noop
88 @object_types ["ChatMessage", "Question", "Answer"]
89 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
90 def persist(%{"type" => type} = object, meta) when type in @object_types do
91 with {:ok, object} <- Object.create(object) do
96 def persist(object, meta) do
97 with local <- Keyword.fetch!(meta, :local),
98 {recipients, _, _} <- get_recipients(object),
100 Repo.insert(%Activity{
103 recipients: recipients,
104 actor: object["actor"]
106 {:ok, activity, meta}
110 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
111 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
112 with nil <- Activity.normalize(map),
113 map <- lazy_put_activity_defaults(map, fake),
114 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
115 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
116 {:ok, map} <- MRF.filter(map),
117 {recipients, _, _} = get_recipients(map),
118 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
119 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
120 {:ok, map, object} <- insert_full_object(map) do
126 recipients: recipients
129 |> maybe_create_activity_expiration()
131 # Splice in the child object if we have one.
132 activity = Maps.put_if_present(activity, :object, object)
134 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
138 %Activity{} = activity ->
141 {:fake, true, map, recipients} ->
142 activity = %Activity{
146 recipients: recipients,
150 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 def notify_and_stream(activity) do
159 Notification.create_notifications(activity)
161 conversation = create_or_bump_conversation(activity, activity.actor)
162 participations = get_participations(conversation)
164 stream_out_participations(participations)
167 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
168 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
173 defp maybe_create_activity_expiration(result), do: result
175 defp create_or_bump_conversation(activity, actor) do
176 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
177 %User{} = user <- User.get_cached_by_ap_id(actor) do
178 Participation.mark_as_read(user, conversation)
183 defp get_participations({:ok, conversation}) do
185 |> Repo.preload(:participations, force: true)
186 |> Map.get(:participations)
189 defp get_participations(_), do: []
191 def stream_out_participations(participations) do
194 |> Repo.preload(:user)
196 Streamer.stream("participation", participations)
199 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
200 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
201 conversation = Repo.preload(conversation, :participations)
204 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
209 if last_activity_id do
210 stream_out_participations(conversation.participations)
215 def stream_out_participations(_, _), do: :noop
217 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
218 when data_type in ["Create", "Announce", "Delete"] do
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(activity)
224 def stream_out(_activity) do
228 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
229 def create(params, fake \\ false) do
230 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
235 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
236 additional = params[:additional] || %{}
237 # only accept false as false value
238 local = !(params[:local] == false)
239 published = params[:published]
240 quick_insert? = Config.get([:env]) == :benchmark
244 %{to: to, actor: actor, published: published, context: context, object: object},
248 with {:ok, activity} <- insert(create_data, local, fake),
249 {:fake, false, activity} <- {:fake, fake, activity},
250 _ <- increase_replies_count_if_reply(create_data),
251 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
252 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
253 _ <- notify_and_stream(activity),
254 :ok <- maybe_federate(activity) do
257 {:quick_insert, true, activity} ->
260 {:fake, true, activity} ->
264 Repo.rollback(message)
268 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
269 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(listen_data, local),
282 _ <- notify_and_stream(activity),
283 :ok <- maybe_federate(activity) do
288 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
289 def reject(params) do
290 accept_or_reject("Reject", params)
293 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
294 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
295 local = Map.get(params, :local, true)
296 activity_id = Map.get(params, :activity_id, nil)
299 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
300 |> Maps.put_if_present("id", activity_id)
302 with {:ok, activity} <- insert(data, local),
303 _ <- notify_and_stream(activity),
304 :ok <- maybe_federate(activity) do
309 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
310 {:ok, Activity.t()} | nil | {:error, any()}
311 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
312 with {:ok, result} <-
313 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
318 defp do_unfollow(follower, followed, activity_id, local) do
319 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
320 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
321 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
322 {:ok, activity} <- insert(unfollow_data, local),
323 _ <- notify_and_stream(activity),
324 :ok <- maybe_federate(activity) do
328 {:error, error} -> Repo.rollback(error)
332 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
342 # only accept false as false value
343 local = !(params[:local] == false)
344 forward = !(params[:forward] == false)
346 additional = params[:additional] || %{}
350 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
352 Map.merge(additional, %{"to" => [], "cc" => []})
355 with flag_data <- make_flag_data(params, additional),
356 {:ok, activity} <- insert(flag_data, local),
357 {:ok, stripped_activity} <- strip_report_status_data(activity),
358 _ <- notify_and_stream(activity),
359 :ok <- maybe_federate(stripped_activity) do
360 User.all_superusers()
361 |> Enum.filter(fn user -> not is_nil(user.email) end)
362 |> Enum.each(fn superuser ->
364 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
365 |> Pleroma.Emails.Mailer.deliver_async()
372 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
373 def move(%User{} = origin, %User{} = target, local \\ true) do
376 "actor" => origin.ap_id,
377 "object" => origin.ap_id,
378 "target" => target.ap_id
381 with true <- origin.ap_id in target.also_known_as,
382 {:ok, activity} <- insert(params, local),
383 _ <- notify_and_stream(activity) do
384 maybe_federate(activity)
386 BackgroundWorker.enqueue("move_following", %{
387 "origin_id" => origin.id,
388 "target_id" => target.id
393 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
398 def fetch_activities_for_context_query(context, opts) do
399 public = [Constants.as_public()]
403 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
406 from(activity in Activity)
407 |> maybe_preload_objects(opts)
408 |> maybe_preload_bookmarks(opts)
409 |> maybe_set_thread_muted_field(opts)
410 |> restrict_blocked(opts)
411 |> restrict_recipients(recipients, opts[:user])
412 |> restrict_filtered(opts)
416 "?->>'type' = ? and ?->>'context' = ?",
423 |> exclude_poll_votes(opts)
425 |> order_by([activity], desc: activity.id)
428 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
429 def fetch_activities_for_context(context, opts \\ %{}) do
431 |> fetch_activities_for_context_query(opts)
435 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
436 FlakeId.Ecto.CompatType.t() | nil
437 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
439 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
440 |> restrict_visibility(%{visibility: "direct"})
446 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
447 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
448 opts = Map.delete(opts, :user)
450 [Constants.as_public()]
451 |> fetch_activities_query(opts)
452 |> restrict_unlisted(opts)
453 |> Pagination.fetch_paginated(opts, pagination)
456 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
457 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
459 |> Map.put(:restrict_unlisted, true)
460 |> fetch_public_or_unlisted_activities(pagination)
463 @valid_visibilities ~w[direct unlisted public private]
465 defp restrict_visibility(query, %{visibility: visibility})
466 when is_list(visibility) do
467 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
472 "activity_visibility(?, ?, ?) = ANY (?)",
480 Logger.error("Could not restrict visibility to #{visibility}")
484 defp restrict_visibility(query, %{visibility: visibility})
485 when visibility in @valid_visibilities do
489 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
493 defp restrict_visibility(_query, %{visibility: visibility})
494 when visibility not in @valid_visibilities do
495 Logger.error("Could not restrict visibility to #{visibility}")
498 defp restrict_visibility(query, _visibility), do: query
500 defp exclude_visibility(query, %{exclude_visibilities: visibility})
501 when is_list(visibility) do
502 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
507 "activity_visibility(?, ?, ?) = ANY (?)",
515 Logger.error("Could not exclude visibility to #{visibility}")
520 defp exclude_visibility(query, %{exclude_visibilities: visibility})
521 when visibility in @valid_visibilities do
526 "activity_visibility(?, ?, ?) = ?",
535 defp exclude_visibility(query, %{exclude_visibilities: visibility})
536 when visibility not in [nil | @valid_visibilities] do
537 Logger.error("Could not exclude visibility to #{visibility}")
541 defp exclude_visibility(query, _visibility), do: query
543 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
546 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
549 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
552 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
556 defp restrict_thread_visibility(query, _, _), do: query
558 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
561 |> Map.put(:user, reading_user)
562 |> Map.put(:actor_id, user.ap_id)
565 godmode: params[:godmode],
566 reading_user: reading_user
568 |> user_activities_recipients()
569 |> fetch_activities(params)
573 def fetch_user_activities(user, reading_user, params \\ %{}) do
576 |> Map.put(:type, ["Create", "Announce"])
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579 |> Map.put(:pinned_activity_ids, user.pinned_activities)
582 if User.blocks?(reading_user, user) do
586 |> Map.put(:blocking_user, reading_user)
587 |> Map.put(:muting_user, reading_user)
591 godmode: params[:godmode],
592 reading_user: reading_user
594 |> user_activities_recipients()
595 |> fetch_activities(params)
599 def fetch_statuses(reading_user, params) do
600 params = Map.put(params, :type, ["Create", "Announce"])
603 godmode: params[:godmode],
604 reading_user: reading_user
606 |> user_activities_recipients()
607 |> fetch_activities(params, :offset)
611 defp user_activities_recipients(%{godmode: true}), do: []
613 defp user_activities_recipients(%{reading_user: reading_user}) do
615 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
617 [Constants.as_public()]
621 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
622 raise "Can't use the child object without preloading!"
625 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
627 [activity, object] in query,
630 "?->>'type' != ? or ?->>'actor' != ?",
639 defp restrict_announce_object_actor(query, _), do: query
641 defp restrict_since(query, %{since_id: ""}), do: query
643 defp restrict_since(query, %{since_id: since_id}) do
644 from(activity in query, where: activity.id > ^since_id)
647 defp restrict_since(query, _), do: query
649 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
650 raise "Can't use the child object without preloading!"
653 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
655 [_activity, object] in query,
656 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
660 defp restrict_tag_reject(query, _), do: query
662 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
663 raise "Can't use the child object without preloading!"
666 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
668 [_activity, object] in query,
669 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
673 defp restrict_tag_all(query, _), do: query
675 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
676 raise "Can't use the child object without preloading!"
679 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
681 [_activity, object] in query,
682 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
686 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
693 defp restrict_tag(query, _), do: query
695 defp restrict_recipients(query, [], _user), do: query
697 defp restrict_recipients(query, recipients, nil) do
698 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
701 defp restrict_recipients(query, recipients, user) do
704 where: fragment("? && ?", ^recipients, activity.recipients),
705 or_where: activity.actor == ^user.ap_id
709 defp restrict_local(query, %{local_only: true}) do
710 from(activity in query, where: activity.local == true)
713 defp restrict_local(query, _), do: query
715 defp restrict_actor(query, %{actor_id: actor_id}) do
716 from(activity in query, where: activity.actor == ^actor_id)
719 defp restrict_actor(query, _), do: query
721 defp restrict_type(query, %{type: type}) when is_binary(type) do
722 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
725 defp restrict_type(query, %{type: type}) do
726 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
729 defp restrict_type(query, _), do: query
731 defp restrict_state(query, %{state: state}) do
732 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
735 defp restrict_state(query, _), do: query
737 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
739 [_activity, object] in query,
740 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
744 defp restrict_favorited_by(query, _), do: query
746 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
747 raise "Can't use the child object without preloading!"
750 defp restrict_media(query, %{only_media: true}) do
752 [activity, object] in query,
753 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
754 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
758 defp restrict_media(query, _), do: query
760 defp restrict_replies(query, %{exclude_replies: true}) do
762 [_activity, object] in query,
763 where: fragment("?->>'inReplyTo' is null", object.data)
767 defp restrict_replies(query, %{
768 reply_filtering_user: user,
769 reply_visibility: "self"
772 [activity, object] in query,
775 "?->>'inReplyTo' is null OR ? = ANY(?)",
783 defp restrict_replies(query, %{
784 reply_filtering_user: user,
785 reply_visibility: "following"
788 [activity, object] in query,
791 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
793 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
802 defp restrict_replies(query, _), do: query
804 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
805 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
808 defp restrict_reblogs(query, _), do: query
810 defp restrict_muted(query, %{with_muted: true}), do: query
812 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
813 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
816 from([activity] in query,
817 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
818 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
821 unless opts[:skip_preload] do
822 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
828 defp restrict_muted(query, _), do: query
830 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
831 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
832 domain_blocks = user.domain_blocks || []
834 following_ap_ids = User.get_friends_ap_ids(user)
837 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
840 [activity, object: o] in query,
841 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
842 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
845 "recipients_contain_blocked_domains(?, ?) = false",
851 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
858 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
866 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
875 defp restrict_blocked(query, _), do: query
877 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
882 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
884 ^[Constants.as_public()]
889 defp restrict_unlisted(query, _), do: query
891 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
892 from(activity in query, where: activity.id in ^ids)
895 defp restrict_pinned(query, _), do: query
897 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
898 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
904 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
912 defp restrict_muted_reblogs(query, _), do: query
914 defp restrict_instance(query, %{instance: instance}) do
919 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
923 from(activity in query, where: activity.actor in ^users)
926 defp restrict_instance(query, _), do: query
928 defp restrict_filtered(query, %{user: %User{} = user}) do
929 case Filter.compose_regex(user) do
934 from([activity, object] in query,
936 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
937 activity.actor == ^user.ap_id
942 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
943 restrict_filtered(query, %{user: user})
946 defp restrict_filtered(query, _), do: query
948 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
950 defp exclude_poll_votes(query, _) do
951 if has_named_binding?(query, :object) do
952 from([activity, object: o] in query,
953 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
960 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
962 defp exclude_chat_messages(query, _) do
963 if has_named_binding?(query, :object) do
964 from([activity, object: o] in query,
965 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
972 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
974 defp exclude_invisible_actors(query, _opts) do
976 User.Query.build(%{invisible: true, select: [:ap_id]})
978 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
980 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
983 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
984 from(activity in query, where: activity.id != ^id)
987 defp exclude_id(query, _), do: query
989 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
991 defp maybe_preload_objects(query, _) do
993 |> Activity.with_preloaded_object()
996 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
998 defp maybe_preload_bookmarks(query, opts) do
1000 |> Activity.with_preloaded_bookmark(opts[:user])
1003 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1005 |> Activity.with_preloaded_report_notes()
1008 defp maybe_preload_report_notes(query, _), do: query
1010 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1012 defp maybe_set_thread_muted_field(query, opts) do
1014 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1017 defp maybe_order(query, %{order: :desc}) do
1019 |> order_by(desc: :id)
1022 defp maybe_order(query, %{order: :asc}) do
1024 |> order_by(asc: :id)
1027 defp maybe_order(query, _), do: query
1029 defp fetch_activities_query_ap_ids_ops(opts) do
1030 source_user = opts[:muting_user]
1031 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1033 ap_id_relationships =
1034 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1035 [:block | ap_id_relationships]
1040 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1042 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1043 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1045 restrict_muted_reblogs_opts =
1046 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1048 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1051 def fetch_activities_query(recipients, opts \\ %{}) do
1052 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1053 fetch_activities_query_ap_ids_ops(opts)
1056 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1060 |> maybe_preload_objects(opts)
1061 |> maybe_preload_bookmarks(opts)
1062 |> maybe_preload_report_notes(opts)
1063 |> maybe_set_thread_muted_field(opts)
1064 |> maybe_order(opts)
1065 |> restrict_recipients(recipients, opts[:user])
1066 |> restrict_replies(opts)
1067 |> restrict_tag(opts)
1068 |> restrict_tag_reject(opts)
1069 |> restrict_tag_all(opts)
1070 |> restrict_since(opts)
1071 |> restrict_local(opts)
1072 |> restrict_actor(opts)
1073 |> restrict_type(opts)
1074 |> restrict_state(opts)
1075 |> restrict_favorited_by(opts)
1076 |> restrict_blocked(restrict_blocked_opts)
1077 |> restrict_muted(restrict_muted_opts)
1078 |> restrict_filtered(opts)
1079 |> restrict_media(opts)
1080 |> restrict_visibility(opts)
1081 |> restrict_thread_visibility(opts, config)
1082 |> restrict_reblogs(opts)
1083 |> restrict_pinned(opts)
1084 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1085 |> restrict_instance(opts)
1086 |> restrict_announce_object_actor(opts)
1087 |> restrict_filtered(opts)
1088 |> Activity.restrict_deactivated_users()
1089 |> exclude_poll_votes(opts)
1090 |> exclude_chat_messages(opts)
1091 |> exclude_invisible_actors(opts)
1092 |> exclude_visibility(opts)
1095 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1096 list_memberships = Pleroma.List.memberships(opts[:user])
1098 fetch_activities_query(recipients ++ list_memberships, opts)
1099 |> Pagination.fetch_paginated(opts, pagination)
1101 |> maybe_update_cc(list_memberships, opts[:user])
1105 Fetch favorites activities of user with order by sort adds to favorites
1107 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1108 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1110 |> Activity.Queries.by_actor()
1111 |> Activity.Queries.by_type("Like")
1112 |> Activity.with_joined_object()
1113 |> Object.with_joined_activity()
1114 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1115 |> order_by([like, _, _], desc_nulls_last: like.id)
1116 |> Pagination.fetch_paginated(
1117 Map.merge(params, %{skip_order: true}),
1122 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1123 Enum.map(activities, fn
1124 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1125 if Enum.any?(bcc, &(&1 in list_memberships)) do
1126 update_in(activity.data["cc"], &[user_ap_id | &1])
1136 defp maybe_update_cc(activities, _, _), do: activities
1138 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1139 from(activity in query,
1141 fragment("? && ?", activity.recipients, ^recipients) or
1142 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1143 ^Constants.as_public() in activity.recipients)
1147 def fetch_activities_bounded(
1149 recipients_with_public,
1151 pagination \\ :keyset
1153 fetch_activities_query([], opts)
1154 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1155 |> Pagination.fetch_paginated(opts, pagination)
1159 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1160 def upload(file, opts \\ []) do
1161 with {:ok, data} <- Upload.store(file, opts) do
1162 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1164 Repo.insert(%Object{data: obj_data})
1168 @spec get_actor_url(any()) :: binary() | nil
1169 defp get_actor_url(url) when is_binary(url), do: url
1170 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1172 defp get_actor_url(url) when is_list(url) do
1178 defp get_actor_url(_url), do: nil
1180 defp object_to_user_data(data) do
1182 data["icon"]["url"] &&
1185 "url" => [%{"href" => data["icon"]["url"]}]
1189 data["image"]["url"] &&
1192 "url" => [%{"href" => data["image"]["url"]}]
1197 |> Map.get("attachment", [])
1198 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1199 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1203 |> Map.get("tag", [])
1205 %{"type" => "Emoji"} -> true
1208 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1209 {String.trim(name, ":"), url}
1212 locked = data["manuallyApprovesFollowers"] || false
1213 capabilities = data["capabilities"] || %{}
1214 accepts_chat_messages = capabilities["acceptsChatMessages"]
1215 data = Transmogrifier.maybe_fix_user_object(data)
1216 discoverable = data["discoverable"] || false
1217 invisible = data["invisible"] || false
1218 actor_type = data["type"] || "Person"
1221 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1222 data["publicKey"]["publicKeyPem"]
1228 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1229 data["endpoints"]["sharedInbox"]
1236 uri: get_actor_url(data["url"]),
1242 discoverable: discoverable,
1243 invisible: invisible,
1246 follower_address: data["followers"],
1247 following_address: data["following"],
1248 bio: data["summary"],
1249 actor_type: actor_type,
1250 also_known_as: Map.get(data, "alsoKnownAs", []),
1251 public_key: public_key,
1252 inbox: data["inbox"],
1253 shared_inbox: shared_inbox,
1254 accepts_chat_messages: accepts_chat_messages
1257 # nickname can be nil because of virtual actors
1258 if data["preferredUsername"] do
1262 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1265 Map.put(user_data, :nickname, nil)
1269 def fetch_follow_information_for_user(user) do
1270 with {:ok, following_data} <-
1271 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1272 {:ok, hide_follows} <- collection_private(following_data),
1273 {:ok, followers_data} <-
1274 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1275 {:ok, hide_followers} <- collection_private(followers_data) do
1278 hide_follows: hide_follows,
1279 follower_count: normalize_counter(followers_data["totalItems"]),
1280 following_count: normalize_counter(following_data["totalItems"]),
1281 hide_followers: hide_followers
1284 {:error, _} = e -> e
1289 defp normalize_counter(counter) when is_integer(counter), do: counter
1290 defp normalize_counter(_), do: 0
1292 def maybe_update_follow_information(user_data) do
1293 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1294 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1296 {:collections_available,
1297 !!(user_data[:following_address] && user_data[:follower_address])},
1299 fetch_follow_information_for_user(user_data) do
1300 info = Map.merge(user_data[:info] || %{}, info)
1303 |> Map.put(:info, info)
1305 {:user_type_check, false} ->
1308 {:collections_available, false} ->
1311 {:enabled, false} ->
1316 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1323 defp collection_private(%{"first" => %{"type" => type}})
1324 when type in ["CollectionPage", "OrderedCollectionPage"],
1327 defp collection_private(%{"first" => first}) do
1328 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1329 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1332 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1333 {:error, _} = e -> e
1338 defp collection_private(_data), do: {:ok, true}
1340 def user_data_from_user_object(data) do
1341 with {:ok, data} <- MRF.filter(data) do
1342 {:ok, object_to_user_data(data)}
1348 def fetch_and_prepare_user_from_ap_id(ap_id) do
1349 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1350 {:ok, data} <- user_data_from_user_object(data) do
1351 {:ok, maybe_update_follow_information(data)}
1353 {:error, "Object has been deleted" = e} ->
1354 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1357 {:error, {:reject, reason} = e} ->
1358 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1362 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1367 def maybe_handle_clashing_nickname(data) do
1368 nickname = data[:nickname]
1370 with %User{} = old_user <- User.get_by_nickname(nickname),
1371 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1373 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1379 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1380 |> User.update_and_set_cache()
1382 {:ap_id_comparison, true} ->
1384 "Found an old user for #{nickname}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1392 def make_user_from_ap_id(ap_id) do
1393 user = User.get_cached_by_ap_id(ap_id)
1395 if user && !User.ap_enabled?(user) do
1396 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1398 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1401 |> User.remote_user_changeset(data)
1402 |> User.update_and_set_cache()
1404 maybe_handle_clashing_nickname(data)
1407 |> User.remote_user_changeset()
1415 def make_user_from_nickname(nickname) do
1416 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1417 make_user_from_ap_id(ap_id)
1419 _e -> {:error, "No AP id in WebFinger"}
1423 # filter out broken threads
1424 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1425 entire_thread_visible_for_user?(activity, user)
1428 # do post-processing on a specific activity
1429 def contain_activity(%Activity{} = activity, %User{} = user) do
1430 contain_broken_threads(activity, user)
1433 def fetch_direct_messages_query do
1435 |> restrict_type(%{type: "Create"})
1436 |> restrict_visibility(%{visibility: "direct"})
1437 |> order_by([activity], asc: activity.id)