1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
53 defp check_actor_is_active(nil), do: true
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
67 defp check_remote_limit(_), do: true
69 defp increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
86 defp increase_replies_count_if_reply(_create_data), do: :noop
88 defp increase_poll_votes_if_vote(%{
89 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
93 Object.increase_vote_count(reply_ap_id, name, actor)
96 defp increase_poll_votes_if_vote(_create_data), do: :noop
98 @object_types ["ChatMessage"]
99 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
100 def persist(%{"type" => type} = object, meta) when type in @object_types do
101 with {:ok, object} <- Object.create(object) do
106 def persist(object, meta) do
107 with local <- Keyword.fetch!(meta, :local),
108 {recipients, _, _} <- get_recipients(object),
110 Repo.insert(%Activity{
113 recipients: recipients,
114 actor: object["actor"]
116 {:ok, activity, meta}
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map) do
136 recipients: recipients
139 |> maybe_create_activity_expiration()
141 # Splice in the child object if we have one.
142 activity = Maps.put_if_present(activity, :object, object)
144 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
148 %Activity{} = activity ->
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
156 recipients: recipients,
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
168 def notify_and_stream(activity) do
169 Notification.create_notifications(activity)
171 conversation = create_or_bump_conversation(activity, activity.actor)
172 participations = get_participations(conversation)
174 stream_out_participations(participations)
177 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
178 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
183 defp maybe_create_activity_expiration(result), do: result
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
193 defp get_participations({:ok, conversation}) do
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
199 defp get_participations(_), do: []
201 def stream_out_participations(participations) do
204 |> Repo.preload(:user)
206 Streamer.stream("participation", participations)
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
214 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
225 def stream_out_participations(_, _), do: :noop
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
234 def stream_out(_activity) do
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
254 %{to: to, actor: actor, published: published, context: context, object: object},
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
268 {:quick_insert, true, activity} ->
271 {:fake, true, activity} ->
275 Repo.rollback(message)
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
288 %{to: to, actor: actor, published: published, context: context, object: object},
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
325 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
326 {:ok, Activity.t()} | nil | {:error, any()}
327 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
334 defp do_unfollow(follower, followed, activity_id, local) do
335 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
336 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
337 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
338 {:ok, activity} <- insert(unfollow_data, local),
339 _ <- notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
344 {:error, error} -> Repo.rollback(error)
348 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
358 # only accept false as false value
359 local = !(params[:local] == false)
360 forward = !(params[:forward] == false)
362 additional = params[:additional] || %{}
366 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
368 Map.merge(additional, %{"to" => [], "cc" => []})
371 with flag_data <- make_flag_data(params, additional),
372 {:ok, activity} <- insert(flag_data, local),
373 {:ok, stripped_activity} <- strip_report_status_data(activity),
374 _ <- notify_and_stream(activity),
375 :ok <- maybe_federate(stripped_activity) do
376 User.all_superusers()
377 |> Enum.filter(fn user -> not is_nil(user.email) end)
378 |> Enum.each(fn superuser ->
380 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
381 |> Pleroma.Emails.Mailer.deliver_async()
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
432 "?->>'type' = ? and ?->>'context' = ?",
439 |> exclude_poll_votes(opts)
441 |> order_by([activity], desc: activity.id)
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
447 |> fetch_activities_for_context_query(opts)
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
479 @valid_visibilities ~w[direct unlisted public private]
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
488 "activity_visibility(?, ?, ?) = ANY (?)",
496 Logger.error("Could not restrict visibility to #{visibility}")
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
514 defp restrict_visibility(query, _visibility), do: query
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 "activity_visibility(?, ?, ?) = ANY (?)",
531 Logger.error("Could not exclude visibility to #{visibility}")
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
542 "activity_visibility(?, ?, ?) = ?",
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
557 defp exclude_visibility(query, _visibility), do: query
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
572 defp restrict_thread_visibility(query, _, _), do: query
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
581 godmode: params[:godmode],
582 reading_user: reading_user
584 |> user_activities_recipients()
585 |> fetch_activities(params)
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
598 if User.blocks?(reading_user, user) do
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
607 godmode: params[:godmode],
608 reading_user: reading_user
610 |> user_activities_recipients()
611 |> fetch_activities(params)
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
619 godmode: params[:godmode],
620 reading_user: reading_user
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
627 defp user_activities_recipients(%{godmode: true}), do: []
629 defp user_activities_recipients(%{reading_user: reading_user}) do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
633 [Constants.as_public()]
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
643 [activity, object] in query,
646 "?->>'type' != ? or ?->>'actor' != ?",
655 defp restrict_announce_object_actor(query, _), do: query
657 defp restrict_since(query, %{since_id: ""}), do: query
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
663 defp restrict_since(query, _), do: query
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise "Can't use the child object without preloading!"
669 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
676 defp restrict_tag_reject(query, _), do: query
678 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
679 raise "Can't use the child object without preloading!"
682 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
689 defp restrict_tag_all(query, _), do: query
691 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
692 raise "Can't use the child object without preloading!"
695 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
697 [_activity, object] in query,
698 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
702 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
709 defp restrict_tag(query, _), do: query
711 defp restrict_recipients(query, [], _user), do: query
713 defp restrict_recipients(query, recipients, nil) do
714 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
717 defp restrict_recipients(query, recipients, user) do
720 where: fragment("? && ?", ^recipients, activity.recipients),
721 or_where: activity.actor == ^user.ap_id
725 defp restrict_local(query, %{local_only: true}) do
726 from(activity in query, where: activity.local == true)
729 defp restrict_local(query, _), do: query
731 defp restrict_actor(query, %{actor_id: actor_id}) do
732 from(activity in query, where: activity.actor == ^actor_id)
735 defp restrict_actor(query, _), do: query
737 defp restrict_type(query, %{type: type}) when is_binary(type) do
738 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
741 defp restrict_type(query, %{type: type}) do
742 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
745 defp restrict_type(query, _), do: query
747 defp restrict_state(query, %{state: state}) do
748 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
751 defp restrict_state(query, _), do: query
753 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
755 [_activity, object] in query,
756 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
760 defp restrict_favorited_by(query, _), do: query
762 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
763 raise "Can't use the child object without preloading!"
766 defp restrict_media(query, %{only_media: true}) do
768 [activity, object] in query,
769 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
770 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
774 defp restrict_media(query, _), do: query
776 defp restrict_replies(query, %{exclude_replies: true}) do
778 [_activity, object] in query,
779 where: fragment("?->>'inReplyTo' is null", object.data)
783 defp restrict_replies(query, %{
784 reply_filtering_user: user,
785 reply_visibility: "self"
788 [activity, object] in query,
791 "?->>'inReplyTo' is null OR ? = ANY(?)",
799 defp restrict_replies(query, %{
800 reply_filtering_user: user,
801 reply_visibility: "following"
804 [activity, object] in query,
807 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
809 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
818 defp restrict_replies(query, _), do: query
820 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
821 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
824 defp restrict_reblogs(query, _), do: query
826 defp restrict_muted(query, %{with_muted: true}), do: query
828 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
829 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
832 from([activity] in query,
833 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
834 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
837 unless opts[:skip_preload] do
838 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
844 defp restrict_muted(query, _), do: query
846 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
847 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
848 domain_blocks = user.domain_blocks || []
850 following_ap_ids = User.get_friends_ap_ids(user)
853 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
856 [activity, object: o] in query,
857 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
858 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
861 "recipients_contain_blocked_domains(?, ?) = false",
867 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
874 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
882 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
891 defp restrict_blocked(query, _), do: query
893 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
898 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
900 ^[Constants.as_public()]
905 defp restrict_unlisted(query, _), do: query
907 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
908 from(activity in query, where: activity.id in ^ids)
911 defp restrict_pinned(query, _), do: query
913 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
914 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
920 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
928 defp restrict_muted_reblogs(query, _), do: query
930 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
933 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
937 defp restrict_instance(query, _), do: query
939 defp restrict_filtered(query, %{user: %User{} = user}) do
940 case Filter.compose_regex(user) do
945 from([activity, object] in query,
947 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
948 activity.actor == ^user.ap_id
953 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
954 restrict_filtered(query, %{user: user})
957 defp restrict_filtered(query, _), do: query
959 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
961 defp exclude_poll_votes(query, _) do
962 if has_named_binding?(query, :object) do
963 from([activity, object: o] in query,
964 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
971 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
973 defp exclude_chat_messages(query, _) do
974 if has_named_binding?(query, :object) do
975 from([activity, object: o] in query,
976 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
983 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
985 defp exclude_invisible_actors(query, _opts) do
987 User.Query.build(%{invisible: true, select: [:ap_id]})
989 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
991 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
994 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
995 from(activity in query, where: activity.id != ^id)
998 defp exclude_id(query, _), do: query
1000 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1002 defp maybe_preload_objects(query, _) do
1004 |> Activity.with_preloaded_object()
1007 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1009 defp maybe_preload_bookmarks(query, opts) do
1011 |> Activity.with_preloaded_bookmark(opts[:user])
1014 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1016 |> Activity.with_preloaded_report_notes()
1019 defp maybe_preload_report_notes(query, _), do: query
1021 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1023 defp maybe_set_thread_muted_field(query, opts) do
1025 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1028 defp maybe_order(query, %{order: :desc}) do
1030 |> order_by(desc: :id)
1033 defp maybe_order(query, %{order: :asc}) do
1035 |> order_by(asc: :id)
1038 defp maybe_order(query, _), do: query
1040 defp fetch_activities_query_ap_ids_ops(opts) do
1041 source_user = opts[:muting_user]
1042 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1044 ap_id_relationships =
1045 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1046 [:block | ap_id_relationships]
1051 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1053 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1054 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1056 restrict_muted_reblogs_opts =
1057 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1059 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1062 def fetch_activities_query(recipients, opts \\ %{}) do
1063 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1064 fetch_activities_query_ap_ids_ops(opts)
1067 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1071 |> maybe_preload_objects(opts)
1072 |> maybe_preload_bookmarks(opts)
1073 |> maybe_preload_report_notes(opts)
1074 |> maybe_set_thread_muted_field(opts)
1075 |> maybe_order(opts)
1076 |> restrict_recipients(recipients, opts[:user])
1077 |> restrict_replies(opts)
1078 |> restrict_tag(opts)
1079 |> restrict_tag_reject(opts)
1080 |> restrict_tag_all(opts)
1081 |> restrict_since(opts)
1082 |> restrict_local(opts)
1083 |> restrict_actor(opts)
1084 |> restrict_type(opts)
1085 |> restrict_state(opts)
1086 |> restrict_favorited_by(opts)
1087 |> restrict_blocked(restrict_blocked_opts)
1088 |> restrict_muted(restrict_muted_opts)
1089 |> restrict_filtered(opts)
1090 |> restrict_media(opts)
1091 |> restrict_visibility(opts)
1092 |> restrict_thread_visibility(opts, config)
1093 |> restrict_reblogs(opts)
1094 |> restrict_pinned(opts)
1095 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1096 |> restrict_instance(opts)
1097 |> restrict_announce_object_actor(opts)
1098 |> restrict_filtered(opts)
1099 |> Activity.restrict_deactivated_users()
1100 |> exclude_poll_votes(opts)
1101 |> exclude_chat_messages(opts)
1102 |> exclude_invisible_actors(opts)
1103 |> exclude_visibility(opts)
1106 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1107 list_memberships = Pleroma.List.memberships(opts[:user])
1109 fetch_activities_query(recipients ++ list_memberships, opts)
1110 |> Pagination.fetch_paginated(opts, pagination)
1112 |> maybe_update_cc(list_memberships, opts[:user])
1116 Fetch favorites activities of user with order by sort adds to favorites
1118 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1119 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1121 |> Activity.Queries.by_actor()
1122 |> Activity.Queries.by_type("Like")
1123 |> Activity.with_joined_object()
1124 |> Object.with_joined_activity()
1125 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1126 |> order_by([like, _, _], desc_nulls_last: like.id)
1127 |> Pagination.fetch_paginated(
1128 Map.merge(params, %{skip_order: true}),
1133 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1134 Enum.map(activities, fn
1135 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1136 if Enum.any?(bcc, &(&1 in list_memberships)) do
1137 update_in(activity.data["cc"], &[user_ap_id | &1])
1147 defp maybe_update_cc(activities, _, _), do: activities
1149 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1150 from(activity in query,
1152 fragment("? && ?", activity.recipients, ^recipients) or
1153 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1154 ^Constants.as_public() in activity.recipients)
1158 def fetch_activities_bounded(
1160 recipients_with_public,
1162 pagination \\ :keyset
1164 fetch_activities_query([], opts)
1165 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1166 |> Pagination.fetch_paginated(opts, pagination)
1170 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1171 def upload(file, opts \\ []) do
1172 with {:ok, data} <- Upload.store(file, opts) do
1173 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1175 Repo.insert(%Object{data: obj_data})
1179 @spec get_actor_url(any()) :: binary() | nil
1180 defp get_actor_url(url) when is_binary(url), do: url
1181 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1183 defp get_actor_url(url) when is_list(url) do
1189 defp get_actor_url(_url), do: nil
1191 defp object_to_user_data(data) do
1193 data["icon"]["url"] &&
1196 "url" => [%{"href" => data["icon"]["url"]}]
1200 data["image"]["url"] &&
1203 "url" => [%{"href" => data["image"]["url"]}]
1208 |> Map.get("attachment", [])
1209 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1210 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1214 |> Map.get("tag", [])
1216 %{"type" => "Emoji"} -> true
1219 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1220 {String.trim(name, ":"), url}
1223 locked = data["manuallyApprovesFollowers"] || false
1224 capabilities = data["capabilities"] || %{}
1225 accepts_chat_messages = capabilities["acceptsChatMessages"]
1226 data = Transmogrifier.maybe_fix_user_object(data)
1227 discoverable = data["discoverable"] || false
1228 invisible = data["invisible"] || false
1229 actor_type = data["type"] || "Person"
1232 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1233 data["publicKey"]["publicKeyPem"]
1239 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1240 data["endpoints"]["sharedInbox"]
1247 uri: get_actor_url(data["url"]),
1253 discoverable: discoverable,
1254 invisible: invisible,
1257 follower_address: data["followers"],
1258 following_address: data["following"],
1259 bio: data["summary"],
1260 actor_type: actor_type,
1261 also_known_as: Map.get(data, "alsoKnownAs", []),
1262 public_key: public_key,
1263 inbox: data["inbox"],
1264 shared_inbox: shared_inbox,
1265 accepts_chat_messages: accepts_chat_messages
1268 # nickname can be nil because of virtual actors
1269 if data["preferredUsername"] do
1273 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1276 Map.put(user_data, :nickname, nil)
1280 def fetch_follow_information_for_user(user) do
1281 with {:ok, following_data} <-
1282 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1283 {:ok, hide_follows} <- collection_private(following_data),
1284 {:ok, followers_data} <-
1285 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1286 {:ok, hide_followers} <- collection_private(followers_data) do
1289 hide_follows: hide_follows,
1290 follower_count: normalize_counter(followers_data["totalItems"]),
1291 following_count: normalize_counter(following_data["totalItems"]),
1292 hide_followers: hide_followers
1295 {:error, _} = e -> e
1300 defp normalize_counter(counter) when is_integer(counter), do: counter
1301 defp normalize_counter(_), do: 0
1303 def maybe_update_follow_information(user_data) do
1304 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1305 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1307 {:collections_available,
1308 !!(user_data[:following_address] && user_data[:follower_address])},
1310 fetch_follow_information_for_user(user_data) do
1311 info = Map.merge(user_data[:info] || %{}, info)
1314 |> Map.put(:info, info)
1316 {:user_type_check, false} ->
1319 {:collections_available, false} ->
1322 {:enabled, false} ->
1327 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1334 defp collection_private(%{"first" => %{"type" => type}})
1335 when type in ["CollectionPage", "OrderedCollectionPage"],
1338 defp collection_private(%{"first" => first}) do
1339 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1340 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1343 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1344 {:error, _} = e -> e
1349 defp collection_private(_data), do: {:ok, true}
1351 def user_data_from_user_object(data) do
1352 with {:ok, data} <- MRF.filter(data) do
1353 {:ok, object_to_user_data(data)}
1359 def fetch_and_prepare_user_from_ap_id(ap_id) do
1360 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1361 {:ok, data} <- user_data_from_user_object(data) do
1362 {:ok, maybe_update_follow_information(data)}
1364 {:error, "Object has been deleted" = e} ->
1365 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1369 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1374 def maybe_handle_clashing_nickname(data) do
1375 nickname = data[:nickname]
1377 with %User{} = old_user <- User.get_by_nickname(nickname),
1378 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1380 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1386 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1387 |> User.update_and_set_cache()
1389 {:ap_id_comparison, true} ->
1391 "Found an old user for #{nickname}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1399 def make_user_from_ap_id(ap_id) do
1400 user = User.get_cached_by_ap_id(ap_id)
1402 if user && !User.ap_enabled?(user) do
1403 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1405 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1408 |> User.remote_user_changeset(data)
1409 |> User.update_and_set_cache()
1411 maybe_handle_clashing_nickname(data)
1414 |> User.remote_user_changeset()
1422 def make_user_from_nickname(nickname) do
1423 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1424 make_user_from_ap_id(ap_id)
1426 _e -> {:error, "No AP id in WebFinger"}
1430 # filter out broken threads
1431 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1432 entire_thread_visible_for_user?(activity, user)
1435 # do post-processing on a specific activity
1436 def contain_activity(%Activity{} = activity, %User{} = user) do
1437 contain_broken_threads(activity, user)
1440 def fetch_direct_messages_query do
1442 |> restrict_type(%{type: "Create"})
1443 |> restrict_visibility(%{visibility: "direct"})
1444 |> order_by([activity], asc: activity.id)