1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
53 defp check_actor_is_active(nil), do: true
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
67 defp check_remote_limit(_), do: true
69 defp increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
86 defp increase_replies_count_if_reply(_create_data), do: :noop
88 defp increase_poll_votes_if_vote(%{
89 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
93 Object.increase_vote_count(reply_ap_id, name, actor)
96 defp increase_poll_votes_if_vote(_create_data), do: :noop
98 @object_types ["ChatMessage"]
99 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
100 def persist(%{"type" => type} = object, meta) when type in @object_types do
101 with {:ok, object} <- Object.create(object) do
106 def persist(object, meta) do
107 with local <- Keyword.fetch!(meta, :local),
108 {recipients, _, _} <- get_recipients(object),
110 Repo.insert(%Activity{
113 recipients: recipients,
114 actor: object["actor"]
116 {:ok, activity, meta}
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map) do
136 recipients: recipients
139 |> maybe_create_activity_expiration()
141 # Splice in the child object if we have one.
142 activity = Maps.put_if_present(activity, :object, object)
144 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
148 %Activity{} = activity ->
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
156 recipients: recipients,
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
168 def notify_and_stream(activity) do
169 Notification.create_notifications(activity)
171 conversation = create_or_bump_conversation(activity, activity.actor)
172 participations = get_participations(conversation)
174 stream_out_participations(participations)
177 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
178 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
183 defp maybe_create_activity_expiration(result), do: result
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
193 defp get_participations({:ok, conversation}) do
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
199 defp get_participations(_), do: []
201 def stream_out_participations(participations) do
204 |> Repo.preload(:user)
206 Streamer.stream("participation", participations)
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
214 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
225 def stream_out_participations(_, _), do: :noop
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
234 def stream_out(_activity) do
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
254 %{to: to, actor: actor, published: published, context: context, object: object},
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
268 {:quick_insert, true, activity} ->
271 {:fake, true, activity} ->
275 Repo.rollback(message)
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
288 %{to: to, actor: actor, published: published, context: context, object: object},
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
325 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
326 {:ok, Activity.t()} | {:error, any()}
327 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
334 defp do_follow(follower, followed, activity_id, local, opts) do
335 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
336 data = make_follow_data(follower, followed, activity_id)
338 with {:ok, activity} <- insert(data, local),
339 _ <- skip_notify_and_stream || notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
343 {:error, error} -> Repo.rollback(error)
347 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | nil | {:error, any()}
349 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
356 defp do_unfollow(follower, followed, activity_id, local) do
357 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
358 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
359 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
360 {:ok, activity} <- insert(unfollow_data, local),
361 _ <- notify_and_stream(activity),
362 :ok <- maybe_federate(activity) do
366 {:error, error} -> Repo.rollback(error)
370 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
380 # only accept false as false value
381 local = !(params[:local] == false)
382 forward = !(params[:forward] == false)
384 additional = params[:additional] || %{}
388 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
390 Map.merge(additional, %{"to" => [], "cc" => []})
393 with flag_data <- make_flag_data(params, additional),
394 {:ok, activity} <- insert(flag_data, local),
395 {:ok, stripped_activity} <- strip_report_status_data(activity),
396 _ <- notify_and_stream(activity),
397 :ok <- maybe_federate(stripped_activity) do
398 User.all_superusers()
399 |> Enum.filter(fn user -> not is_nil(user.email) end)
400 |> Enum.each(fn superuser ->
402 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
403 |> Pleroma.Emails.Mailer.deliver_async()
410 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
411 def move(%User{} = origin, %User{} = target, local \\ true) do
414 "actor" => origin.ap_id,
415 "object" => origin.ap_id,
416 "target" => target.ap_id
419 with true <- origin.ap_id in target.also_known_as,
420 {:ok, activity} <- insert(params, local),
421 _ <- notify_and_stream(activity) do
422 maybe_federate(activity)
424 BackgroundWorker.enqueue("move_following", %{
425 "origin_id" => origin.id,
426 "target_id" => target.id
431 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
436 def fetch_activities_for_context_query(context, opts) do
437 public = [Constants.as_public()]
441 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
444 from(activity in Activity)
445 |> maybe_preload_objects(opts)
446 |> maybe_preload_bookmarks(opts)
447 |> maybe_set_thread_muted_field(opts)
448 |> restrict_blocked(opts)
449 |> restrict_recipients(recipients, opts[:user])
450 |> restrict_filtered(opts)
454 "?->>'type' = ? and ?->>'context' = ?",
461 |> exclude_poll_votes(opts)
463 |> order_by([activity], desc: activity.id)
466 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
467 def fetch_activities_for_context(context, opts \\ %{}) do
469 |> fetch_activities_for_context_query(opts)
473 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
474 FlakeId.Ecto.CompatType.t() | nil
475 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
477 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
478 |> restrict_visibility(%{visibility: "direct"})
484 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
485 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
486 opts = Map.delete(opts, :user)
488 [Constants.as_public()]
489 |> fetch_activities_query(opts)
490 |> restrict_unlisted(opts)
491 |> Pagination.fetch_paginated(opts, pagination)
494 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
495 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
497 |> Map.put(:restrict_unlisted, true)
498 |> fetch_public_or_unlisted_activities(pagination)
501 @valid_visibilities ~w[direct unlisted public private]
503 defp restrict_visibility(query, %{visibility: visibility})
504 when is_list(visibility) do
505 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
510 "activity_visibility(?, ?, ?) = ANY (?)",
518 Logger.error("Could not restrict visibility to #{visibility}")
522 defp restrict_visibility(query, %{visibility: visibility})
523 when visibility in @valid_visibilities do
527 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
531 defp restrict_visibility(_query, %{visibility: visibility})
532 when visibility not in @valid_visibilities do
533 Logger.error("Could not restrict visibility to #{visibility}")
536 defp restrict_visibility(query, _visibility), do: query
538 defp exclude_visibility(query, %{exclude_visibilities: visibility})
539 when is_list(visibility) do
540 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
545 "activity_visibility(?, ?, ?) = ANY (?)",
553 Logger.error("Could not exclude visibility to #{visibility}")
558 defp exclude_visibility(query, %{exclude_visibilities: visibility})
559 when visibility in @valid_visibilities do
564 "activity_visibility(?, ?, ?) = ?",
573 defp exclude_visibility(query, %{exclude_visibilities: visibility})
574 when visibility not in [nil | @valid_visibilities] do
575 Logger.error("Could not exclude visibility to #{visibility}")
579 defp exclude_visibility(query, _visibility), do: query
581 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
584 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
587 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
590 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
594 defp restrict_thread_visibility(query, _, _), do: query
596 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
599 |> Map.put(:user, reading_user)
600 |> Map.put(:actor_id, user.ap_id)
603 godmode: params[:godmode],
604 reading_user: reading_user
606 |> user_activities_recipients()
607 |> fetch_activities(params)
611 def fetch_user_activities(user, reading_user, params \\ %{}) do
614 |> Map.put(:type, ["Create", "Announce"])
615 |> Map.put(:user, reading_user)
616 |> Map.put(:actor_id, user.ap_id)
617 |> Map.put(:pinned_activity_ids, user.pinned_activities)
620 if User.blocks?(reading_user, user) do
624 |> Map.put(:blocking_user, reading_user)
625 |> Map.put(:muting_user, reading_user)
629 godmode: params[:godmode],
630 reading_user: reading_user
632 |> user_activities_recipients()
633 |> fetch_activities(params)
637 def fetch_statuses(reading_user, params) do
638 params = Map.put(params, :type, ["Create", "Announce"])
641 godmode: params[:godmode],
642 reading_user: reading_user
644 |> user_activities_recipients()
645 |> fetch_activities(params, :offset)
649 defp user_activities_recipients(%{godmode: true}), do: []
651 defp user_activities_recipients(%{reading_user: reading_user}) do
653 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
655 [Constants.as_public()]
659 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
660 raise "Can't use the child object without preloading!"
663 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
665 [activity, object] in query,
668 "?->>'type' != ? or ?->>'actor' != ?",
677 defp restrict_announce_object_actor(query, _), do: query
679 defp restrict_since(query, %{since_id: ""}), do: query
681 defp restrict_since(query, %{since_id: since_id}) do
682 from(activity in query, where: activity.id > ^since_id)
685 defp restrict_since(query, _), do: query
687 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
688 raise "Can't use the child object without preloading!"
691 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
693 [_activity, object] in query,
694 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
698 defp restrict_tag_reject(query, _), do: query
700 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
701 raise "Can't use the child object without preloading!"
704 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
706 [_activity, object] in query,
707 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
711 defp restrict_tag_all(query, _), do: query
713 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
714 raise "Can't use the child object without preloading!"
717 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
719 [_activity, object] in query,
720 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
724 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
726 [_activity, object] in query,
727 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
731 defp restrict_tag(query, _), do: query
733 defp restrict_recipients(query, [], _user), do: query
735 defp restrict_recipients(query, recipients, nil) do
736 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
739 defp restrict_recipients(query, recipients, user) do
742 where: fragment("? && ?", ^recipients, activity.recipients),
743 or_where: activity.actor == ^user.ap_id
747 defp restrict_local(query, %{local_only: true}) do
748 from(activity in query, where: activity.local == true)
751 defp restrict_local(query, _), do: query
753 defp restrict_actor(query, %{actor_id: actor_id}) do
754 from(activity in query, where: activity.actor == ^actor_id)
757 defp restrict_actor(query, _), do: query
759 defp restrict_type(query, %{type: type}) when is_binary(type) do
760 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
763 defp restrict_type(query, %{type: type}) do
764 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
767 defp restrict_type(query, _), do: query
769 defp restrict_state(query, %{state: state}) do
770 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
773 defp restrict_state(query, _), do: query
775 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
777 [_activity, object] in query,
778 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
782 defp restrict_favorited_by(query, _), do: query
784 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
785 raise "Can't use the child object without preloading!"
788 defp restrict_media(query, %{only_media: true}) do
790 [activity, object] in query,
791 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
792 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
796 defp restrict_media(query, _), do: query
798 defp restrict_replies(query, %{exclude_replies: true}) do
800 [_activity, object] in query,
801 where: fragment("?->>'inReplyTo' is null", object.data)
805 defp restrict_replies(query, %{
806 reply_filtering_user: user,
807 reply_visibility: "self"
810 [activity, object] in query,
813 "?->>'inReplyTo' is null OR ? = ANY(?)",
821 defp restrict_replies(query, %{
822 reply_filtering_user: user,
823 reply_visibility: "following"
826 [activity, object] in query,
829 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
831 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
840 defp restrict_replies(query, _), do: query
842 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
843 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
846 defp restrict_reblogs(query, _), do: query
848 defp restrict_muted(query, %{with_muted: true}), do: query
850 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
851 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
854 from([activity] in query,
855 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
856 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
859 unless opts[:skip_preload] do
860 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
866 defp restrict_muted(query, _), do: query
868 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
869 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
870 domain_blocks = user.domain_blocks || []
872 following_ap_ids = User.get_friends_ap_ids(user)
875 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
878 [activity, object: o] in query,
879 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
880 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
883 "recipients_contain_blocked_domains(?, ?) = false",
889 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
896 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
904 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
913 defp restrict_blocked(query, _), do: query
915 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
920 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
922 ^[Constants.as_public()]
927 defp restrict_unlisted(query, _), do: query
929 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
930 from(activity in query, where: activity.id in ^ids)
933 defp restrict_pinned(query, _), do: query
935 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
936 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
942 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
950 defp restrict_muted_reblogs(query, _), do: query
952 defp restrict_instance(query, %{instance: instance}) do
957 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
961 from(activity in query, where: activity.actor in ^users)
964 defp restrict_instance(query, _), do: query
966 defp restrict_filtered(query, %{user: %User{} = user}) do
967 case Filter.compose_regex(user) do
972 from([activity, object] in query,
974 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
975 activity.actor == ^user.ap_id
980 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
981 restrict_filtered(query, %{user: user})
984 defp restrict_filtered(query, _), do: query
986 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
988 defp exclude_poll_votes(query, _) do
989 if has_named_binding?(query, :object) do
990 from([activity, object: o] in query,
991 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
998 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1000 defp exclude_chat_messages(query, _) do
1001 if has_named_binding?(query, :object) do
1002 from([activity, object: o] in query,
1003 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1010 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1012 defp exclude_invisible_actors(query, _opts) do
1014 User.Query.build(%{invisible: true, select: [:ap_id]})
1016 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1018 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1021 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1022 from(activity in query, where: activity.id != ^id)
1025 defp exclude_id(query, _), do: query
1027 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1029 defp maybe_preload_objects(query, _) do
1031 |> Activity.with_preloaded_object()
1034 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1036 defp maybe_preload_bookmarks(query, opts) do
1038 |> Activity.with_preloaded_bookmark(opts[:user])
1041 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1043 |> Activity.with_preloaded_report_notes()
1046 defp maybe_preload_report_notes(query, _), do: query
1048 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1050 defp maybe_set_thread_muted_field(query, opts) do
1052 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1055 defp maybe_order(query, %{order: :desc}) do
1057 |> order_by(desc: :id)
1060 defp maybe_order(query, %{order: :asc}) do
1062 |> order_by(asc: :id)
1065 defp maybe_order(query, _), do: query
1067 defp fetch_activities_query_ap_ids_ops(opts) do
1068 source_user = opts[:muting_user]
1069 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1071 ap_id_relationships =
1072 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1073 [:block | ap_id_relationships]
1078 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1080 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1081 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1083 restrict_muted_reblogs_opts =
1084 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1086 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1089 def fetch_activities_query(recipients, opts \\ %{}) do
1090 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1091 fetch_activities_query_ap_ids_ops(opts)
1094 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1098 |> maybe_preload_objects(opts)
1099 |> maybe_preload_bookmarks(opts)
1100 |> maybe_preload_report_notes(opts)
1101 |> maybe_set_thread_muted_field(opts)
1102 |> maybe_order(opts)
1103 |> restrict_recipients(recipients, opts[:user])
1104 |> restrict_replies(opts)
1105 |> restrict_tag(opts)
1106 |> restrict_tag_reject(opts)
1107 |> restrict_tag_all(opts)
1108 |> restrict_since(opts)
1109 |> restrict_local(opts)
1110 |> restrict_actor(opts)
1111 |> restrict_type(opts)
1112 |> restrict_state(opts)
1113 |> restrict_favorited_by(opts)
1114 |> restrict_blocked(restrict_blocked_opts)
1115 |> restrict_muted(restrict_muted_opts)
1116 |> restrict_filtered(opts)
1117 |> restrict_media(opts)
1118 |> restrict_visibility(opts)
1119 |> restrict_thread_visibility(opts, config)
1120 |> restrict_reblogs(opts)
1121 |> restrict_pinned(opts)
1122 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1123 |> restrict_instance(opts)
1124 |> restrict_announce_object_actor(opts)
1125 |> restrict_filtered(opts)
1126 |> Activity.restrict_deactivated_users()
1127 |> exclude_poll_votes(opts)
1128 |> exclude_chat_messages(opts)
1129 |> exclude_invisible_actors(opts)
1130 |> exclude_visibility(opts)
1133 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1134 list_memberships = Pleroma.List.memberships(opts[:user])
1136 fetch_activities_query(recipients ++ list_memberships, opts)
1137 |> Pagination.fetch_paginated(opts, pagination)
1139 |> maybe_update_cc(list_memberships, opts[:user])
1143 Fetch favorites activities of user with order by sort adds to favorites
1145 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1146 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1148 |> Activity.Queries.by_actor()
1149 |> Activity.Queries.by_type("Like")
1150 |> Activity.with_joined_object()
1151 |> Object.with_joined_activity()
1152 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1153 |> order_by([like, _, _], desc_nulls_last: like.id)
1154 |> Pagination.fetch_paginated(
1155 Map.merge(params, %{skip_order: true}),
1160 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1161 Enum.map(activities, fn
1162 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1163 if Enum.any?(bcc, &(&1 in list_memberships)) do
1164 update_in(activity.data["cc"], &[user_ap_id | &1])
1174 defp maybe_update_cc(activities, _, _), do: activities
1176 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1177 from(activity in query,
1179 fragment("? && ?", activity.recipients, ^recipients) or
1180 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1181 ^Constants.as_public() in activity.recipients)
1185 def fetch_activities_bounded(
1187 recipients_with_public,
1189 pagination \\ :keyset
1191 fetch_activities_query([], opts)
1192 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1193 |> Pagination.fetch_paginated(opts, pagination)
1197 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1198 def upload(file, opts \\ []) do
1199 with {:ok, data} <- Upload.store(file, opts) do
1200 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1202 Repo.insert(%Object{data: obj_data})
1206 @spec get_actor_url(any()) :: binary() | nil
1207 defp get_actor_url(url) when is_binary(url), do: url
1208 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1210 defp get_actor_url(url) when is_list(url) do
1216 defp get_actor_url(_url), do: nil
1218 defp object_to_user_data(data) do
1220 data["icon"]["url"] &&
1223 "url" => [%{"href" => data["icon"]["url"]}]
1227 data["image"]["url"] &&
1230 "url" => [%{"href" => data["image"]["url"]}]
1235 |> Map.get("attachment", [])
1236 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1237 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1241 |> Map.get("tag", [])
1243 %{"type" => "Emoji"} -> true
1246 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1247 {String.trim(name, ":"), url}
1250 locked = data["manuallyApprovesFollowers"] || false
1251 data = Transmogrifier.maybe_fix_user_object(data)
1252 discoverable = data["discoverable"] || false
1253 invisible = data["invisible"] || false
1254 actor_type = data["type"] || "Person"
1257 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1258 data["publicKey"]["publicKeyPem"]
1264 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1265 data["endpoints"]["sharedInbox"]
1272 uri: get_actor_url(data["url"]),
1278 discoverable: discoverable,
1279 invisible: invisible,
1282 follower_address: data["followers"],
1283 following_address: data["following"],
1284 bio: data["summary"],
1285 actor_type: actor_type,
1286 also_known_as: Map.get(data, "alsoKnownAs", []),
1287 public_key: public_key,
1288 inbox: data["inbox"],
1289 shared_inbox: shared_inbox
1292 # nickname can be nil because of virtual actors
1293 if data["preferredUsername"] do
1297 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1300 Map.put(user_data, :nickname, nil)
1304 def fetch_follow_information_for_user(user) do
1305 with {:ok, following_data} <-
1306 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1307 {:ok, hide_follows} <- collection_private(following_data),
1308 {:ok, followers_data} <-
1309 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1310 {:ok, hide_followers} <- collection_private(followers_data) do
1313 hide_follows: hide_follows,
1314 follower_count: normalize_counter(followers_data["totalItems"]),
1315 following_count: normalize_counter(following_data["totalItems"]),
1316 hide_followers: hide_followers
1319 {:error, _} = e -> e
1324 defp normalize_counter(counter) when is_integer(counter), do: counter
1325 defp normalize_counter(_), do: 0
1327 def maybe_update_follow_information(user_data) do
1328 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1329 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1331 {:collections_available,
1332 !!(user_data[:following_address] && user_data[:follower_address])},
1334 fetch_follow_information_for_user(user_data) do
1335 info = Map.merge(user_data[:info] || %{}, info)
1338 |> Map.put(:info, info)
1340 {:user_type_check, false} ->
1343 {:collections_available, false} ->
1346 {:enabled, false} ->
1351 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1358 defp collection_private(%{"first" => %{"type" => type}})
1359 when type in ["CollectionPage", "OrderedCollectionPage"],
1362 defp collection_private(%{"first" => first}) do
1363 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1364 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1367 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1368 {:error, _} = e -> e
1373 defp collection_private(_data), do: {:ok, true}
1375 def user_data_from_user_object(data) do
1376 with {:ok, data} <- MRF.filter(data) do
1377 {:ok, object_to_user_data(data)}
1383 def fetch_and_prepare_user_from_ap_id(ap_id) do
1384 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1385 {:ok, data} <- user_data_from_user_object(data) do
1386 {:ok, maybe_update_follow_information(data)}
1388 {:error, "Object has been deleted" = e} ->
1389 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1393 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1398 def maybe_handle_clashing_nickname(nickname) do
1399 with %User{} = old_user <- User.get_by_nickname(nickname) do
1400 Logger.info("Found an old user for #{nickname}, ap id is #{old_user.ap_id}, renaming.")
1403 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1404 |> User.update_and_set_cache()
1408 def make_user_from_ap_id(ap_id) do
1409 user = User.get_cached_by_ap_id(ap_id)
1411 if user && !User.ap_enabled?(user) do
1412 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1414 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1417 |> User.remote_user_changeset(data)
1418 |> User.update_and_set_cache()
1420 maybe_handle_clashing_nickname(data[:nickname])
1423 |> User.remote_user_changeset()
1431 def make_user_from_nickname(nickname) do
1432 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1433 make_user_from_ap_id(ap_id)
1435 _e -> {:error, "No AP id in WebFinger"}
1439 # filter out broken threads
1440 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1441 entire_thread_visible_for_user?(activity, user)
1444 # do post-processing on a specific activity
1445 def contain_activity(%Activity{} = activity, %User{} = user) do
1446 contain_broken_threads(activity, user)
1449 def fetch_direct_messages_query do
1451 |> restrict_type(%{type: "Create"})
1452 |> restrict_visibility(%{visibility: "direct"})
1453 |> order_by([activity], asc: activity.id)