1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
53 defp check_actor_is_active(nil), do: true
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
67 defp check_remote_limit(_), do: true
69 defp increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
86 defp increase_replies_count_if_reply(_create_data), do: :noop
88 defp increase_poll_votes_if_vote(%{
89 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
93 Object.increase_vote_count(reply_ap_id, name, actor)
96 defp increase_poll_votes_if_vote(_create_data), do: :noop
98 @object_types ["ChatMessage"]
99 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
100 def persist(%{"type" => type} = object, meta) when type in @object_types do
101 with {:ok, object} <- Object.create(object) do
106 def persist(object, meta) do
107 with local <- Keyword.fetch!(meta, :local),
108 {recipients, _, _} <- get_recipients(object),
110 Repo.insert(%Activity{
113 recipients: recipients,
114 actor: object["actor"]
116 {:ok, activity, meta}
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map) do
136 recipients: recipients
139 |> maybe_create_activity_expiration()
141 # Splice in the child object if we have one.
142 activity = Maps.put_if_present(activity, :object, object)
144 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
148 %Activity{} = activity ->
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
156 recipients: recipients,
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
168 def notify_and_stream(activity) do
169 Notification.create_notifications(activity)
171 conversation = create_or_bump_conversation(activity, activity.actor)
172 participations = get_participations(conversation)
174 stream_out_participations(participations)
177 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
178 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
183 defp maybe_create_activity_expiration(result), do: result
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
193 defp get_participations({:ok, conversation}) do
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
199 defp get_participations(_), do: []
201 def stream_out_participations(participations) do
204 |> Repo.preload(:user)
206 Streamer.stream("participation", participations)
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
214 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
225 def stream_out_participations(_, _), do: :noop
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
234 def stream_out(_activity) do
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
254 %{to: to, actor: actor, published: published, context: context, object: object},
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
268 {:quick_insert, true, activity} ->
271 {:fake, true, activity} ->
275 Repo.rollback(message)
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
288 %{to: to, actor: actor, published: published, context: context, object: object},
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
325 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
326 {:ok, Activity.t()} | nil | {:error, any()}
327 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
334 defp do_unfollow(follower, followed, activity_id, local) do
335 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
336 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
337 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
338 {:ok, activity} <- insert(unfollow_data, local),
339 _ <- notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
344 {:error, error} -> Repo.rollback(error)
348 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
358 # only accept false as false value
359 local = !(params[:local] == false)
360 forward = !(params[:forward] == false)
362 additional = params[:additional] || %{}
366 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
368 Map.merge(additional, %{"to" => [], "cc" => []})
371 with flag_data <- make_flag_data(params, additional),
372 {:ok, activity} <- insert(flag_data, local),
373 {:ok, stripped_activity} <- strip_report_status_data(activity),
374 _ <- notify_and_stream(activity),
375 :ok <- maybe_federate(stripped_activity) do
376 User.all_superusers()
377 |> Enum.filter(fn user -> not is_nil(user.email) end)
378 |> Enum.each(fn superuser ->
380 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
381 |> Pleroma.Emails.Mailer.deliver_async()
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
432 "?->>'type' = ? and ?->>'context' = ?",
439 |> exclude_poll_votes(opts)
441 |> order_by([activity], desc: activity.id)
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
447 |> fetch_activities_for_context_query(opts)
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
479 @valid_visibilities ~w[direct unlisted public private]
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
488 "activity_visibility(?, ?, ?) = ANY (?)",
496 Logger.error("Could not restrict visibility to #{visibility}")
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
514 defp restrict_visibility(query, _visibility), do: query
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 "activity_visibility(?, ?, ?) = ANY (?)",
531 Logger.error("Could not exclude visibility to #{visibility}")
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
542 "activity_visibility(?, ?, ?) = ?",
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
557 defp exclude_visibility(query, _visibility), do: query
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
572 defp restrict_thread_visibility(query, _, _), do: query
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
581 godmode: params[:godmode],
582 reading_user: reading_user
584 |> user_activities_recipients()
585 |> fetch_activities(params)
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
598 if User.blocks?(reading_user, user) do
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
607 godmode: params[:godmode],
608 reading_user: reading_user
610 |> user_activities_recipients()
611 |> fetch_activities(params)
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
619 godmode: params[:godmode],
620 reading_user: reading_user
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
627 defp user_activities_recipients(%{godmode: true}), do: []
629 defp user_activities_recipients(%{reading_user: reading_user}) do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
633 [Constants.as_public()]
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
643 [activity, object] in query,
646 "?->>'type' != ? or ?->>'actor' != ?",
655 defp restrict_announce_object_actor(query, _), do: query
657 defp restrict_since(query, %{since_id: ""}), do: query
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
663 defp restrict_since(query, _), do: query
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise "Can't use the child object without preloading!"
669 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
676 defp restrict_tag_reject(query, _), do: query
678 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
679 raise "Can't use the child object without preloading!"
682 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
689 defp restrict_tag_all(query, _), do: query
691 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
692 raise "Can't use the child object without preloading!"
695 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
697 [_activity, object] in query,
698 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
702 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
709 defp restrict_tag(query, _), do: query
711 defp restrict_recipients(query, [], _user), do: query
713 defp restrict_recipients(query, recipients, nil) do
714 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
717 defp restrict_recipients(query, recipients, user) do
720 where: fragment("? && ?", ^recipients, activity.recipients),
721 or_where: activity.actor == ^user.ap_id
725 defp restrict_local(query, %{local_only: true}) do
726 from(activity in query, where: activity.local == true)
729 defp restrict_local(query, _), do: query
731 defp restrict_actor(query, %{actor_id: actor_id}) do
732 from(activity in query, where: activity.actor == ^actor_id)
735 defp restrict_actor(query, _), do: query
737 defp restrict_type(query, %{type: type}) when is_binary(type) do
738 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
741 defp restrict_type(query, %{type: type}) do
742 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
745 defp restrict_type(query, _), do: query
747 defp restrict_state(query, %{state: state}) do
748 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
751 defp restrict_state(query, _), do: query
753 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
755 [_activity, object] in query,
756 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
760 defp restrict_favorited_by(query, _), do: query
762 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
763 raise "Can't use the child object without preloading!"
766 defp restrict_media(query, %{only_media: true}) do
768 [activity, object] in query,
769 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
770 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
774 defp restrict_media(query, _), do: query
776 defp restrict_replies(query, %{exclude_replies: true}) do
778 [_activity, object] in query,
779 where: fragment("?->>'inReplyTo' is null", object.data)
783 defp restrict_replies(query, %{
784 reply_filtering_user: user,
785 reply_visibility: "self"
788 [activity, object] in query,
791 "?->>'inReplyTo' is null OR ? = ANY(?)",
799 defp restrict_replies(query, %{
800 reply_filtering_user: user,
801 reply_visibility: "following"
804 [activity, object] in query,
807 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
809 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
818 defp restrict_replies(query, _), do: query
820 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
821 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
824 defp restrict_reblogs(query, _), do: query
826 defp restrict_muted(query, %{with_muted: true}), do: query
828 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
829 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
832 from([activity] in query,
833 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
834 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
837 unless opts[:skip_preload] do
838 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
844 defp restrict_muted(query, _), do: query
846 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
847 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
848 domain_blocks = user.domain_blocks || []
850 following_ap_ids = User.get_friends_ap_ids(user)
853 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
856 [activity, object: o] in query,
857 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
858 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
861 "recipients_contain_blocked_domains(?, ?) = false",
867 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
874 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
882 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
891 defp restrict_blocked(query, _), do: query
893 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
898 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
900 ^[Constants.as_public()]
905 defp restrict_unlisted(query, _), do: query
907 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
908 from(activity in query, where: activity.id in ^ids)
911 defp restrict_pinned(query, _), do: query
913 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
914 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
920 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
928 defp restrict_muted_reblogs(query, _), do: query
930 defp restrict_instance(query, %{instance: instance}) do
935 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
939 from(activity in query, where: activity.actor in ^users)
942 defp restrict_instance(query, _), do: query
944 defp restrict_filtered(query, %{user: %User{} = user}) do
945 case Filter.compose_regex(user) do
950 from([activity, object] in query,
952 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
953 activity.actor == ^user.ap_id
958 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
959 restrict_filtered(query, %{user: user})
962 defp restrict_filtered(query, _), do: query
964 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
966 defp exclude_poll_votes(query, _) do
967 if has_named_binding?(query, :object) do
968 from([activity, object: o] in query,
969 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
976 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
978 defp exclude_chat_messages(query, _) do
979 if has_named_binding?(query, :object) do
980 from([activity, object: o] in query,
981 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
988 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
990 defp exclude_invisible_actors(query, _opts) do
992 User.Query.build(%{invisible: true, select: [:ap_id]})
994 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
996 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
999 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1000 from(activity in query, where: activity.id != ^id)
1003 defp exclude_id(query, _), do: query
1005 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1007 defp maybe_preload_objects(query, _) do
1009 |> Activity.with_preloaded_object()
1012 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1014 defp maybe_preload_bookmarks(query, opts) do
1016 |> Activity.with_preloaded_bookmark(opts[:user])
1019 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1021 |> Activity.with_preloaded_report_notes()
1024 defp maybe_preload_report_notes(query, _), do: query
1026 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1028 defp maybe_set_thread_muted_field(query, opts) do
1030 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1033 defp maybe_order(query, %{order: :desc}) do
1035 |> order_by(desc: :id)
1038 defp maybe_order(query, %{order: :asc}) do
1040 |> order_by(asc: :id)
1043 defp maybe_order(query, _), do: query
1045 defp fetch_activities_query_ap_ids_ops(opts) do
1046 source_user = opts[:muting_user]
1047 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1049 ap_id_relationships =
1050 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1051 [:block | ap_id_relationships]
1056 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1058 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1059 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1061 restrict_muted_reblogs_opts =
1062 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1064 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1067 def fetch_activities_query(recipients, opts \\ %{}) do
1068 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1069 fetch_activities_query_ap_ids_ops(opts)
1072 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1076 |> maybe_preload_objects(opts)
1077 |> maybe_preload_bookmarks(opts)
1078 |> maybe_preload_report_notes(opts)
1079 |> maybe_set_thread_muted_field(opts)
1080 |> maybe_order(opts)
1081 |> restrict_recipients(recipients, opts[:user])
1082 |> restrict_replies(opts)
1083 |> restrict_tag(opts)
1084 |> restrict_tag_reject(opts)
1085 |> restrict_tag_all(opts)
1086 |> restrict_since(opts)
1087 |> restrict_local(opts)
1088 |> restrict_actor(opts)
1089 |> restrict_type(opts)
1090 |> restrict_state(opts)
1091 |> restrict_favorited_by(opts)
1092 |> restrict_blocked(restrict_blocked_opts)
1093 |> restrict_muted(restrict_muted_opts)
1094 |> restrict_filtered(opts)
1095 |> restrict_media(opts)
1096 |> restrict_visibility(opts)
1097 |> restrict_thread_visibility(opts, config)
1098 |> restrict_reblogs(opts)
1099 |> restrict_pinned(opts)
1100 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1101 |> restrict_instance(opts)
1102 |> restrict_announce_object_actor(opts)
1103 |> restrict_filtered(opts)
1104 |> Activity.restrict_deactivated_users()
1105 |> exclude_poll_votes(opts)
1106 |> exclude_chat_messages(opts)
1107 |> exclude_invisible_actors(opts)
1108 |> exclude_visibility(opts)
1111 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1112 list_memberships = Pleroma.List.memberships(opts[:user])
1114 fetch_activities_query(recipients ++ list_memberships, opts)
1115 |> Pagination.fetch_paginated(opts, pagination)
1117 |> maybe_update_cc(list_memberships, opts[:user])
1121 Fetch favorites activities of user with order by sort adds to favorites
1123 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1124 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1126 |> Activity.Queries.by_actor()
1127 |> Activity.Queries.by_type("Like")
1128 |> Activity.with_joined_object()
1129 |> Object.with_joined_activity()
1130 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1131 |> order_by([like, _, _], desc_nulls_last: like.id)
1132 |> Pagination.fetch_paginated(
1133 Map.merge(params, %{skip_order: true}),
1138 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1139 Enum.map(activities, fn
1140 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1141 if Enum.any?(bcc, &(&1 in list_memberships)) do
1142 update_in(activity.data["cc"], &[user_ap_id | &1])
1152 defp maybe_update_cc(activities, _, _), do: activities
1154 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1155 from(activity in query,
1157 fragment("? && ?", activity.recipients, ^recipients) or
1158 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1159 ^Constants.as_public() in activity.recipients)
1163 def fetch_activities_bounded(
1165 recipients_with_public,
1167 pagination \\ :keyset
1169 fetch_activities_query([], opts)
1170 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1171 |> Pagination.fetch_paginated(opts, pagination)
1175 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1176 def upload(file, opts \\ []) do
1177 with {:ok, data} <- Upload.store(file, opts) do
1178 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1180 Repo.insert(%Object{data: obj_data})
1184 @spec get_actor_url(any()) :: binary() | nil
1185 defp get_actor_url(url) when is_binary(url), do: url
1186 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1188 defp get_actor_url(url) when is_list(url) do
1194 defp get_actor_url(_url), do: nil
1196 defp object_to_user_data(data) do
1198 data["icon"]["url"] &&
1201 "url" => [%{"href" => data["icon"]["url"]}]
1205 data["image"]["url"] &&
1208 "url" => [%{"href" => data["image"]["url"]}]
1213 |> Map.get("attachment", [])
1214 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1215 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1219 |> Map.get("tag", [])
1221 %{"type" => "Emoji"} -> true
1224 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1225 {String.trim(name, ":"), url}
1228 locked = data["manuallyApprovesFollowers"] || false
1229 capabilities = data["capabilities"] || %{}
1230 accepts_chat_messages = capabilities["acceptsChatMessages"]
1231 data = Transmogrifier.maybe_fix_user_object(data)
1232 discoverable = data["discoverable"] || false
1233 invisible = data["invisible"] || false
1234 actor_type = data["type"] || "Person"
1237 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1238 data["publicKey"]["publicKeyPem"]
1244 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1245 data["endpoints"]["sharedInbox"]
1252 uri: get_actor_url(data["url"]),
1258 discoverable: discoverable,
1259 invisible: invisible,
1262 follower_address: data["followers"],
1263 following_address: data["following"],
1264 bio: data["summary"],
1265 actor_type: actor_type,
1266 also_known_as: Map.get(data, "alsoKnownAs", []),
1267 public_key: public_key,
1268 inbox: data["inbox"],
1269 shared_inbox: shared_inbox,
1270 accepts_chat_messages: accepts_chat_messages
1273 # nickname can be nil because of virtual actors
1274 if data["preferredUsername"] do
1278 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1281 Map.put(user_data, :nickname, nil)
1285 def fetch_follow_information_for_user(user) do
1286 with {:ok, following_data} <-
1287 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1288 {:ok, hide_follows} <- collection_private(following_data),
1289 {:ok, followers_data} <-
1290 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1291 {:ok, hide_followers} <- collection_private(followers_data) do
1294 hide_follows: hide_follows,
1295 follower_count: normalize_counter(followers_data["totalItems"]),
1296 following_count: normalize_counter(following_data["totalItems"]),
1297 hide_followers: hide_followers
1300 {:error, _} = e -> e
1305 defp normalize_counter(counter) when is_integer(counter), do: counter
1306 defp normalize_counter(_), do: 0
1308 def maybe_update_follow_information(user_data) do
1309 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1310 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1312 {:collections_available,
1313 !!(user_data[:following_address] && user_data[:follower_address])},
1315 fetch_follow_information_for_user(user_data) do
1316 info = Map.merge(user_data[:info] || %{}, info)
1319 |> Map.put(:info, info)
1321 {:user_type_check, false} ->
1324 {:collections_available, false} ->
1327 {:enabled, false} ->
1332 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1339 defp collection_private(%{"first" => %{"type" => type}})
1340 when type in ["CollectionPage", "OrderedCollectionPage"],
1343 defp collection_private(%{"first" => first}) do
1344 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1345 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1348 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1349 {:error, _} = e -> e
1354 defp collection_private(_data), do: {:ok, true}
1356 def user_data_from_user_object(data) do
1357 with {:ok, data} <- MRF.filter(data) do
1358 {:ok, object_to_user_data(data)}
1364 def fetch_and_prepare_user_from_ap_id(ap_id) do
1365 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1366 {:ok, data} <- user_data_from_user_object(data) do
1367 {:ok, maybe_update_follow_information(data)}
1369 {:error, "Object has been deleted" = e} ->
1370 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1373 {:error, {:reject, reason} = e} ->
1374 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1378 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1383 def maybe_handle_clashing_nickname(data) do
1384 nickname = data[:nickname]
1386 with %User{} = old_user <- User.get_by_nickname(nickname),
1387 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1389 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1395 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1396 |> User.update_and_set_cache()
1398 {:ap_id_comparison, true} ->
1400 "Found an old user for #{nickname}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1408 def make_user_from_ap_id(ap_id) do
1409 user = User.get_cached_by_ap_id(ap_id)
1411 if user && !User.ap_enabled?(user) do
1412 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1414 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1417 |> User.remote_user_changeset(data)
1418 |> User.update_and_set_cache()
1420 maybe_handle_clashing_nickname(data)
1423 |> User.remote_user_changeset()
1431 def make_user_from_nickname(nickname) do
1432 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1433 make_user_from_ap_id(ap_id)
1435 _e -> {:error, "No AP id in WebFinger"}
1439 # filter out broken threads
1440 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1441 entire_thread_visible_for_user?(activity, user)
1444 # do post-processing on a specific activity
1445 def contain_activity(%Activity{} = activity, %User{} = user) do
1446 contain_broken_threads(activity, user)
1449 def fetch_direct_messages_query do
1451 |> restrict_type(%{type: "Create"})
1452 |> restrict_visibility(%{visibility: "direct"})
1453 |> order_by([activity], asc: activity.id)