1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
53 defp check_actor_is_active(nil), do: true
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
67 defp check_remote_limit(_), do: true
69 defp increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
86 defp increase_replies_count_if_reply(_create_data), do: :noop
88 defp increase_poll_votes_if_vote(%{
89 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
93 Object.increase_vote_count(reply_ap_id, name, actor)
96 defp increase_poll_votes_if_vote(_create_data), do: :noop
98 @object_types ["ChatMessage"]
99 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
100 def persist(%{"type" => type} = object, meta) when type in @object_types do
101 with {:ok, object} <- Object.create(object) do
106 def persist(object, meta) do
107 with local <- Keyword.fetch!(meta, :local),
108 {recipients, _, _} <- get_recipients(object),
110 Repo.insert(%Activity{
113 recipients: recipients,
114 actor: object["actor"]
116 {:ok, activity, meta}
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map) do
136 recipients: recipients
139 |> maybe_create_activity_expiration()
141 # Splice in the child object if we have one.
142 activity = Maps.put_if_present(activity, :object, object)
144 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
148 %Activity{} = activity ->
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
156 recipients: recipients,
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
168 def notify_and_stream(activity) do
169 Notification.create_notifications(activity)
171 conversation = create_or_bump_conversation(activity, activity.actor)
172 participations = get_participations(conversation)
174 stream_out_participations(participations)
177 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
178 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
183 defp maybe_create_activity_expiration(result), do: result
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
193 defp get_participations({:ok, conversation}) do
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
199 defp get_participations(_), do: []
201 def stream_out_participations(participations) do
204 |> Repo.preload(:user)
206 Streamer.stream("participation", participations)
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
214 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
225 def stream_out_participations(_, _), do: :noop
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
234 def stream_out(_activity) do
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
254 %{to: to, actor: actor, published: published, context: context, object: object},
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
268 {:quick_insert, true, activity} ->
271 {:fake, true, activity} ->
275 Repo.rollback(message)
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
288 %{to: to, actor: actor, published: published, context: context, object: object},
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
325 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
326 {:ok, Activity.t()} | {:error, any()}
327 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
334 defp do_follow(follower, followed, activity_id, local, opts) do
335 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
336 data = make_follow_data(follower, followed, activity_id)
338 with {:ok, activity} <- insert(data, local),
339 _ <- skip_notify_and_stream || notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
343 {:error, error} -> Repo.rollback(error)
347 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | nil | {:error, any()}
349 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
356 defp do_unfollow(follower, followed, activity_id, local) do
357 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
358 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
359 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
360 {:ok, activity} <- insert(unfollow_data, local),
361 _ <- notify_and_stream(activity),
362 :ok <- maybe_federate(activity) do
366 {:error, error} -> Repo.rollback(error)
370 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
380 # only accept false as false value
381 local = !(params[:local] == false)
382 forward = !(params[:forward] == false)
384 additional = params[:additional] || %{}
388 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
390 Map.merge(additional, %{"to" => [], "cc" => []})
393 with flag_data <- make_flag_data(params, additional),
394 {:ok, activity} <- insert(flag_data, local),
395 {:ok, stripped_activity} <- strip_report_status_data(activity),
396 _ <- notify_and_stream(activity),
397 :ok <- maybe_federate(stripped_activity) do
398 User.all_superusers()
399 |> Enum.filter(fn user -> not is_nil(user.email) end)
400 |> Enum.each(fn superuser ->
402 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
403 |> Pleroma.Emails.Mailer.deliver_async()
410 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
411 def move(%User{} = origin, %User{} = target, local \\ true) do
414 "actor" => origin.ap_id,
415 "object" => origin.ap_id,
416 "target" => target.ap_id
419 with true <- origin.ap_id in target.also_known_as,
420 {:ok, activity} <- insert(params, local),
421 _ <- notify_and_stream(activity) do
422 maybe_federate(activity)
424 BackgroundWorker.enqueue("move_following", %{
425 "origin_id" => origin.id,
426 "target_id" => target.id
431 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
436 def fetch_activities_for_context_query(context, opts) do
437 public = [Constants.as_public()]
441 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
444 from(activity in Activity)
445 |> maybe_preload_objects(opts)
446 |> maybe_preload_bookmarks(opts)
447 |> maybe_set_thread_muted_field(opts)
448 |> restrict_blocked(opts)
449 |> restrict_recipients(recipients, opts[:user])
453 "?->>'type' = ? and ?->>'context' = ?",
460 |> exclude_poll_votes(opts)
462 |> order_by([activity], desc: activity.id)
465 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
466 def fetch_activities_for_context(context, opts \\ %{}) do
468 |> fetch_activities_for_context_query(opts)
472 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
473 FlakeId.Ecto.CompatType.t() | nil
474 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
476 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
477 |> restrict_visibility(%{visibility: "direct"})
483 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
484 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
485 opts = Map.delete(opts, :user)
487 [Constants.as_public()]
488 |> fetch_activities_query(opts)
489 |> restrict_unlisted(opts)
490 |> Pagination.fetch_paginated(opts, pagination)
493 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
494 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
496 |> Map.put(:restrict_unlisted, true)
497 |> fetch_public_or_unlisted_activities(pagination)
500 @valid_visibilities ~w[direct unlisted public private]
502 defp restrict_visibility(query, %{visibility: visibility})
503 when is_list(visibility) do
504 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
509 "activity_visibility(?, ?, ?) = ANY (?)",
517 Logger.error("Could not restrict visibility to #{visibility}")
521 defp restrict_visibility(query, %{visibility: visibility})
522 when visibility in @valid_visibilities do
526 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
530 defp restrict_visibility(_query, %{visibility: visibility})
531 when visibility not in @valid_visibilities do
532 Logger.error("Could not restrict visibility to #{visibility}")
535 defp restrict_visibility(query, _visibility), do: query
537 defp exclude_visibility(query, %{exclude_visibilities: visibility})
538 when is_list(visibility) do
539 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
544 "activity_visibility(?, ?, ?) = ANY (?)",
552 Logger.error("Could not exclude visibility to #{visibility}")
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility in @valid_visibilities do
563 "activity_visibility(?, ?, ?) = ?",
572 defp exclude_visibility(query, %{exclude_visibilities: visibility})
573 when visibility not in [nil | @valid_visibilities] do
574 Logger.error("Could not exclude visibility to #{visibility}")
578 defp exclude_visibility(query, _visibility), do: query
580 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
583 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
586 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
589 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
593 defp restrict_thread_visibility(query, _, _), do: query
595 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
602 godmode: params[:godmode],
603 reading_user: reading_user
605 |> user_activities_recipients()
606 |> fetch_activities(params)
610 def fetch_user_activities(user, reading_user, params \\ %{}) do
613 |> Map.put(:type, ["Create", "Announce"])
614 |> Map.put(:user, reading_user)
615 |> Map.put(:actor_id, user.ap_id)
616 |> Map.put(:pinned_activity_ids, user.pinned_activities)
619 if User.blocks?(reading_user, user) do
623 |> Map.put(:blocking_user, reading_user)
624 |> Map.put(:muting_user, reading_user)
628 godmode: params[:godmode],
629 reading_user: reading_user
631 |> user_activities_recipients()
632 |> fetch_activities(params)
636 def fetch_statuses(reading_user, params) do
637 params = Map.put(params, :type, ["Create", "Announce"])
640 godmode: params[:godmode],
641 reading_user: reading_user
643 |> user_activities_recipients()
644 |> fetch_activities(params, :offset)
648 defp user_activities_recipients(%{godmode: true}), do: []
650 defp user_activities_recipients(%{reading_user: reading_user}) do
652 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
654 [Constants.as_public()]
658 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
659 raise "Can't use the child object without preloading!"
662 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
664 [activity, object] in query,
667 "?->>'type' != ? or ?->>'actor' != ?",
676 defp restrict_announce_object_actor(query, _), do: query
678 defp restrict_since(query, %{since_id: ""}), do: query
680 defp restrict_since(query, %{since_id: since_id}) do
681 from(activity in query, where: activity.id > ^since_id)
684 defp restrict_since(query, _), do: query
686 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
690 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
692 [_activity, object] in query,
693 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
697 defp restrict_tag_reject(query, _), do: query
699 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
700 raise "Can't use the child object without preloading!"
703 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
710 defp restrict_tag_all(query, _), do: query
712 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
713 raise "Can't use the child object without preloading!"
716 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
718 [_activity, object] in query,
719 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
723 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
725 [_activity, object] in query,
726 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
730 defp restrict_tag(query, _), do: query
732 defp restrict_recipients(query, [], _user), do: query
734 defp restrict_recipients(query, recipients, nil) do
735 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
738 defp restrict_recipients(query, recipients, user) do
741 where: fragment("? && ?", ^recipients, activity.recipients),
742 or_where: activity.actor == ^user.ap_id
746 defp restrict_local(query, %{local_only: true}) do
747 from(activity in query, where: activity.local == true)
750 defp restrict_local(query, _), do: query
752 defp restrict_actor(query, %{actor_id: actor_id}) do
753 from(activity in query, where: activity.actor == ^actor_id)
756 defp restrict_actor(query, _), do: query
758 defp restrict_type(query, %{type: type}) when is_binary(type) do
759 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
762 defp restrict_type(query, %{type: type}) do
763 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
766 defp restrict_type(query, _), do: query
768 defp restrict_state(query, %{state: state}) do
769 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
772 defp restrict_state(query, _), do: query
774 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
776 [_activity, object] in query,
777 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
781 defp restrict_favorited_by(query, _), do: query
783 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
784 raise "Can't use the child object without preloading!"
787 defp restrict_media(query, %{only_media: true}) do
789 [activity, object] in query,
790 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
791 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
795 defp restrict_media(query, _), do: query
797 defp restrict_replies(query, %{exclude_replies: true}) do
799 [_activity, object] in query,
800 where: fragment("?->>'inReplyTo' is null", object.data)
804 defp restrict_replies(query, %{
805 reply_filtering_user: user,
806 reply_visibility: "self"
809 [activity, object] in query,
812 "?->>'inReplyTo' is null OR ? = ANY(?)",
820 defp restrict_replies(query, %{
821 reply_filtering_user: user,
822 reply_visibility: "following"
825 [activity, object] in query,
828 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
830 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
839 defp restrict_replies(query, _), do: query
841 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
842 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
845 defp restrict_reblogs(query, _), do: query
847 defp restrict_muted(query, %{with_muted: true}), do: query
849 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
850 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
853 from([activity] in query,
854 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
855 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
858 unless opts[:skip_preload] do
859 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
865 defp restrict_muted(query, _), do: query
867 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
868 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
869 domain_blocks = user.domain_blocks || []
871 following_ap_ids = User.get_friends_ap_ids(user)
874 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
877 [activity, object: o] in query,
878 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
879 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
882 "recipients_contain_blocked_domains(?, ?) = false",
888 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
895 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
903 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
912 defp restrict_blocked(query, _), do: query
914 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
919 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
921 ^[Constants.as_public()]
926 defp restrict_unlisted(query, _), do: query
928 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
929 from(activity in query, where: activity.id in ^ids)
932 defp restrict_pinned(query, _), do: query
934 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
935 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
941 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
949 defp restrict_muted_reblogs(query, _), do: query
951 defp restrict_instance(query, %{instance: instance}) do
956 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
960 from(activity in query, where: activity.actor in ^users)
963 defp restrict_instance(query, _), do: query
965 defp restrict_filtered(query, %{user: %User{} = user}) do
966 case Filter.compose_regex(user) do
971 from([activity, object] in query,
973 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
974 activity.actor == ^user.ap_id
979 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
980 restrict_filtered(query, %{user: user})
983 defp restrict_filtered(query, _), do: query
985 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
987 defp exclude_poll_votes(query, _) do
988 if has_named_binding?(query, :object) do
989 from([activity, object: o] in query,
990 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
997 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
999 defp exclude_chat_messages(query, _) do
1000 if has_named_binding?(query, :object) do
1001 from([activity, object: o] in query,
1002 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1009 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1011 defp exclude_invisible_actors(query, _opts) do
1013 User.Query.build(%{invisible: true, select: [:ap_id]})
1015 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1017 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1020 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1021 from(activity in query, where: activity.id != ^id)
1024 defp exclude_id(query, _), do: query
1026 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1028 defp maybe_preload_objects(query, _) do
1030 |> Activity.with_preloaded_object()
1033 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1035 defp maybe_preload_bookmarks(query, opts) do
1037 |> Activity.with_preloaded_bookmark(opts[:user])
1040 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1042 |> Activity.with_preloaded_report_notes()
1045 defp maybe_preload_report_notes(query, _), do: query
1047 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1049 defp maybe_set_thread_muted_field(query, opts) do
1051 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1054 defp maybe_order(query, %{order: :desc}) do
1056 |> order_by(desc: :id)
1059 defp maybe_order(query, %{order: :asc}) do
1061 |> order_by(asc: :id)
1064 defp maybe_order(query, _), do: query
1066 defp fetch_activities_query_ap_ids_ops(opts) do
1067 source_user = opts[:muting_user]
1068 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1070 ap_id_relationships =
1071 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1072 [:block | ap_id_relationships]
1077 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1079 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1080 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1082 restrict_muted_reblogs_opts =
1083 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1085 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1088 def fetch_activities_query(recipients, opts \\ %{}) do
1089 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1090 fetch_activities_query_ap_ids_ops(opts)
1093 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1097 |> maybe_preload_objects(opts)
1098 |> maybe_preload_bookmarks(opts)
1099 |> maybe_preload_report_notes(opts)
1100 |> maybe_set_thread_muted_field(opts)
1101 |> maybe_order(opts)
1102 |> restrict_recipients(recipients, opts[:user])
1103 |> restrict_replies(opts)
1104 |> restrict_tag(opts)
1105 |> restrict_tag_reject(opts)
1106 |> restrict_tag_all(opts)
1107 |> restrict_since(opts)
1108 |> restrict_local(opts)
1109 |> restrict_actor(opts)
1110 |> restrict_type(opts)
1111 |> restrict_state(opts)
1112 |> restrict_favorited_by(opts)
1113 |> restrict_blocked(restrict_blocked_opts)
1114 |> restrict_muted(restrict_muted_opts)
1115 |> restrict_media(opts)
1116 |> restrict_visibility(opts)
1117 |> restrict_thread_visibility(opts, config)
1118 |> restrict_reblogs(opts)
1119 |> restrict_pinned(opts)
1120 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1121 |> restrict_instance(opts)
1122 |> restrict_announce_object_actor(opts)
1123 |> restrict_filtered(opts)
1124 |> Activity.restrict_deactivated_users()
1125 |> exclude_poll_votes(opts)
1126 |> exclude_chat_messages(opts)
1127 |> exclude_invisible_actors(opts)
1128 |> exclude_visibility(opts)
1131 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1132 list_memberships = Pleroma.List.memberships(opts[:user])
1134 fetch_activities_query(recipients ++ list_memberships, opts)
1135 |> Pagination.fetch_paginated(opts, pagination)
1137 |> maybe_update_cc(list_memberships, opts[:user])
1141 Fetch favorites activities of user with order by sort adds to favorites
1143 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1144 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1146 |> Activity.Queries.by_actor()
1147 |> Activity.Queries.by_type("Like")
1148 |> Activity.with_joined_object()
1149 |> Object.with_joined_activity()
1150 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1151 |> order_by([like, _, _], desc_nulls_last: like.id)
1152 |> Pagination.fetch_paginated(
1153 Map.merge(params, %{skip_order: true}),
1158 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1159 Enum.map(activities, fn
1160 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1161 if Enum.any?(bcc, &(&1 in list_memberships)) do
1162 update_in(activity.data["cc"], &[user_ap_id | &1])
1172 defp maybe_update_cc(activities, _, _), do: activities
1174 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1175 from(activity in query,
1177 fragment("? && ?", activity.recipients, ^recipients) or
1178 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1179 ^Constants.as_public() in activity.recipients)
1183 def fetch_activities_bounded(
1185 recipients_with_public,
1187 pagination \\ :keyset
1189 fetch_activities_query([], opts)
1190 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1191 |> Pagination.fetch_paginated(opts, pagination)
1195 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1196 def upload(file, opts \\ []) do
1197 with {:ok, data} <- Upload.store(file, opts) do
1198 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1200 Repo.insert(%Object{data: obj_data})
1204 @spec get_actor_url(any()) :: binary() | nil
1205 defp get_actor_url(url) when is_binary(url), do: url
1206 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1208 defp get_actor_url(url) when is_list(url) do
1214 defp get_actor_url(_url), do: nil
1216 defp object_to_user_data(data) do
1218 data["icon"]["url"] &&
1221 "url" => [%{"href" => data["icon"]["url"]}]
1225 data["image"]["url"] &&
1228 "url" => [%{"href" => data["image"]["url"]}]
1233 |> Map.get("attachment", [])
1234 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1235 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1239 |> Map.get("tag", [])
1241 %{"type" => "Emoji"} -> true
1244 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1245 {String.trim(name, ":"), url}
1248 locked = data["manuallyApprovesFollowers"] || false
1249 data = Transmogrifier.maybe_fix_user_object(data)
1250 discoverable = data["discoverable"] || false
1251 invisible = data["invisible"] || false
1252 actor_type = data["type"] || "Person"
1255 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1256 data["publicKey"]["publicKeyPem"]
1262 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1263 data["endpoints"]["sharedInbox"]
1270 uri: get_actor_url(data["url"]),
1276 discoverable: discoverable,
1277 invisible: invisible,
1280 follower_address: data["followers"],
1281 following_address: data["following"],
1282 bio: data["summary"],
1283 actor_type: actor_type,
1284 also_known_as: Map.get(data, "alsoKnownAs", []),
1285 public_key: public_key,
1286 inbox: data["inbox"],
1287 shared_inbox: shared_inbox
1290 # nickname can be nil because of virtual actors
1291 if data["preferredUsername"] do
1295 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1298 Map.put(user_data, :nickname, nil)
1302 def fetch_follow_information_for_user(user) do
1303 with {:ok, following_data} <-
1304 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1305 {:ok, hide_follows} <- collection_private(following_data),
1306 {:ok, followers_data} <-
1307 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1308 {:ok, hide_followers} <- collection_private(followers_data) do
1311 hide_follows: hide_follows,
1312 follower_count: normalize_counter(followers_data["totalItems"]),
1313 following_count: normalize_counter(following_data["totalItems"]),
1314 hide_followers: hide_followers
1317 {:error, _} = e -> e
1322 defp normalize_counter(counter) when is_integer(counter), do: counter
1323 defp normalize_counter(_), do: 0
1325 def maybe_update_follow_information(user_data) do
1326 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1327 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1329 {:collections_available,
1330 !!(user_data[:following_address] && user_data[:follower_address])},
1332 fetch_follow_information_for_user(user_data) do
1333 info = Map.merge(user_data[:info] || %{}, info)
1336 |> Map.put(:info, info)
1338 {:user_type_check, false} ->
1341 {:collections_available, false} ->
1344 {:enabled, false} ->
1349 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1356 defp collection_private(%{"first" => %{"type" => type}})
1357 when type in ["CollectionPage", "OrderedCollectionPage"],
1360 defp collection_private(%{"first" => first}) do
1361 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1362 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1365 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1366 {:error, _} = e -> e
1371 defp collection_private(_data), do: {:ok, true}
1373 def user_data_from_user_object(data) do
1374 with {:ok, data} <- MRF.filter(data) do
1375 {:ok, object_to_user_data(data)}
1381 def fetch_and_prepare_user_from_ap_id(ap_id) do
1382 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1383 {:ok, data} <- user_data_from_user_object(data) do
1384 {:ok, maybe_update_follow_information(data)}
1386 {:error, "Object has been deleted" = e} ->
1387 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1391 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1396 def maybe_handle_clashing_nickname(nickname) do
1397 with %User{} = old_user <- User.get_by_nickname(nickname) do
1398 Logger.info("Found an old user for #{nickname}, ap id is #{old_user.ap_id}, renaming.")
1401 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1402 |> User.update_and_set_cache()
1406 def make_user_from_ap_id(ap_id) do
1407 user = User.get_cached_by_ap_id(ap_id)
1409 if user && !User.ap_enabled?(user) do
1410 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1412 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1415 |> User.remote_user_changeset(data)
1416 |> User.update_and_set_cache()
1418 maybe_handle_clashing_nickname(data[:nickname])
1421 |> User.remote_user_changeset()
1429 def make_user_from_nickname(nickname) do
1430 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1431 make_user_from_ap_id(ap_id)
1433 _e -> {:error, "No AP id in WebFinger"}
1437 # filter out broken threads
1438 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1439 entire_thread_visible_for_user?(activity, user)
1442 # do post-processing on a specific activity
1443 def contain_activity(%Activity{} = activity, %User{} = user) do
1444 contain_broken_threads(activity, user)
1447 def fetch_direct_messages_query do
1449 |> restrict_type(%{type: "Create"})
1450 |> restrict_visibility(%{visibility: "direct"})
1451 |> order_by([activity], asc: activity.id)