1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
54 defp check_actor_is_active(nil), do: true
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
68 defp check_remote_limit(_), do: true
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
87 defp increase_replies_count_if_reply(_create_data), do: :noop
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
102 Repo.insert(%Activity{
105 recipients: recipients,
106 actor: object["actor"]
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
135 %Activity{} = activity ->
141 {:containment, _} = error ->
144 {:error, _} = error ->
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
152 recipients: recipients,
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
167 defp insert_activity_with_expiration(data, local, recipients) do
171 actor: data["actor"],
172 recipients: recipients
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
186 stream_out_participations(participations)
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
211 defp get_participations({:ok, conversation}) do
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
217 defp get_participations(_), do: []
219 def stream_out_participations(participations) do
222 |> Repo.preload(:user)
224 Streamer.stream("participation", participations)
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
243 def stream_out_participations(_, _), do: :noop
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
252 def stream_out(_activity) do
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
272 %{to: to, actor: actor, published: published, context: context, object: object},
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
285 {:quick_insert, true, activity} ->
288 {:fake, true, activity} ->
292 Repo.rollback(message)
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
305 %{to: to, actor: actor, published: published, context: context, object: object},
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
335 {:error, error} -> Repo.rollback(error)
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
359 additional = params[:additional] || %{}
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
365 Map.merge(additional, %{"to" => [], "cc" => []})
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
384 {:error, error} -> Repo.rollback(error)
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
432 "?->>'type' = ? and ?->>'context' = ?",
439 |> exclude_poll_votes(opts)
441 |> order_by([activity], desc: activity.id)
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
447 |> fetch_activities_for_context_query(opts)
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
479 @valid_visibilities ~w[direct unlisted public private]
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
488 "activity_visibility(?, ?, ?) = ANY (?)",
496 Logger.error("Could not restrict visibility to #{visibility}")
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
514 defp restrict_visibility(query, _visibility), do: query
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 "activity_visibility(?, ?, ?) = ANY (?)",
531 Logger.error("Could not exclude visibility to #{visibility}")
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
542 "activity_visibility(?, ?, ?) = ?",
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
557 defp exclude_visibility(query, _visibility), do: query
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
572 defp restrict_thread_visibility(query, _, _), do: query
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
581 godmode: params[:godmode],
582 reading_user: reading_user
584 |> user_activities_recipients()
585 |> fetch_activities(params)
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
598 if User.blocks?(reading_user, user) do
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
606 pagination_type = Map.get(params, :pagination_type) || :keyset
609 godmode: params[:godmode],
610 reading_user: reading_user
612 |> user_activities_recipients()
613 |> fetch_activities(params, pagination_type)
617 def fetch_statuses(reading_user, params) do
618 params = Map.put(params, :type, ["Create", "Announce"])
621 godmode: params[:godmode],
622 reading_user: reading_user
624 |> user_activities_recipients()
625 |> fetch_activities(params, :offset)
629 defp user_activities_recipients(%{godmode: true}), do: []
631 defp user_activities_recipients(%{reading_user: reading_user}) do
633 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
635 [Constants.as_public()]
639 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
640 raise "Can't use the child object without preloading!"
643 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
645 [activity, object] in query,
648 "?->>'type' != ? or ?->>'actor' != ?",
657 defp restrict_announce_object_actor(query, _), do: query
659 defp restrict_since(query, %{since_id: ""}), do: query
661 defp restrict_since(query, %{since_id: since_id}) do
662 from(activity in query, where: activity.id > ^since_id)
665 defp restrict_since(query, _), do: query
667 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
668 raise "Can't use the child object without preloading!"
671 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
673 [_activity, object] in query,
674 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
678 defp restrict_tag_reject(query, _), do: query
680 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
681 raise "Can't use the child object without preloading!"
684 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
686 [_activity, object] in query,
687 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
691 defp restrict_tag_all(query, _), do: query
693 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
694 raise "Can't use the child object without preloading!"
697 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
699 [_activity, object] in query,
700 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
704 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
706 [_activity, object] in query,
707 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
711 defp restrict_tag(query, _), do: query
713 defp restrict_recipients(query, [], _user), do: query
715 defp restrict_recipients(query, recipients, nil) do
716 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
719 defp restrict_recipients(query, recipients, user) do
722 where: fragment("? && ?", ^recipients, activity.recipients),
723 or_where: activity.actor == ^user.ap_id
727 defp restrict_local(query, %{local_only: true}) do
728 from(activity in query, where: activity.local == true)
731 defp restrict_local(query, _), do: query
733 defp restrict_actor(query, %{actor_id: actor_id}) do
734 from(activity in query, where: activity.actor == ^actor_id)
737 defp restrict_actor(query, _), do: query
739 defp restrict_type(query, %{type: type}) when is_binary(type) do
740 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
743 defp restrict_type(query, %{type: type}) do
744 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
747 defp restrict_type(query, _), do: query
749 defp restrict_state(query, %{state: state}) do
750 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
753 defp restrict_state(query, _), do: query
755 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
757 [_activity, object] in query,
758 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
762 defp restrict_favorited_by(query, _), do: query
764 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
765 raise "Can't use the child object without preloading!"
768 defp restrict_media(query, %{only_media: true}) do
770 [activity, object] in query,
771 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
772 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
776 defp restrict_media(query, _), do: query
778 defp restrict_replies(query, %{exclude_replies: true}) do
780 [_activity, object] in query,
781 where: fragment("?->>'inReplyTo' is null", object.data)
785 defp restrict_replies(query, %{
786 reply_filtering_user: %User{} = user,
787 reply_visibility: "self"
790 [activity, object] in query,
793 "?->>'inReplyTo' is null OR ? = ANY(?)",
801 defp restrict_replies(query, %{
802 reply_filtering_user: %User{} = user,
803 reply_visibility: "following"
806 [activity, object] in query,
810 ?->>'type' != 'Create' -- This isn't a Create
811 OR ?->>'inReplyTo' is null -- this isn't a reply
812 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
813 -- unless they are the author (because authors
814 -- are also part of the recipients). This leads
815 -- to a bug that self-replies by friends won't
817 OR ? = ? -- The actor is us
821 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
830 defp restrict_replies(query, _), do: query
832 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
833 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
836 defp restrict_reblogs(query, _), do: query
838 defp restrict_muted(query, %{with_muted: true}), do: query
840 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
841 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
844 from([activity] in query,
845 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
848 "not (?->'to' \\?| ?) or ? = ?",
856 unless opts[:skip_preload] do
857 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
863 defp restrict_muted(query, _), do: query
865 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
866 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
867 domain_blocks = user.domain_blocks || []
869 following_ap_ids = User.get_friends_ap_ids(user)
872 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
875 [activity, object: o] in query,
876 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
879 "((not (? && ?)) or ? = ?)",
887 "recipients_contain_blocked_domains(?, ?) = false",
893 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
900 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
908 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
917 defp restrict_blocked(query, _), do: query
919 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
924 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
926 ^[Constants.as_public()]
931 defp restrict_unlisted(query, _), do: query
933 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
934 from(activity in query, where: activity.id in ^ids)
937 defp restrict_pinned(query, _), do: query
939 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
940 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
946 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
954 defp restrict_muted_reblogs(query, _), do: query
956 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
959 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
963 defp restrict_instance(query, _), do: query
965 defp restrict_filtered(query, %{user: %User{} = user}) do
966 case Filter.compose_regex(user) do
971 from([activity, object] in query,
973 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
974 activity.actor == ^user.ap_id
979 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
980 restrict_filtered(query, %{user: user})
983 defp restrict_filtered(query, _), do: query
985 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
987 defp exclude_poll_votes(query, _) do
988 if has_named_binding?(query, :object) do
989 from([activity, object: o] in query,
990 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
997 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
999 defp exclude_chat_messages(query, _) do
1000 if has_named_binding?(query, :object) do
1001 from([activity, object: o] in query,
1002 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1009 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1011 defp exclude_invisible_actors(query, _opts) do
1013 User.Query.build(%{invisible: true, select: [:ap_id]})
1015 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1017 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1020 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1021 from(activity in query, where: activity.id != ^id)
1024 defp exclude_id(query, _), do: query
1026 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1028 defp maybe_preload_objects(query, _) do
1030 |> Activity.with_preloaded_object()
1033 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1035 defp maybe_preload_bookmarks(query, opts) do
1037 |> Activity.with_preloaded_bookmark(opts[:user])
1040 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1042 |> Activity.with_preloaded_report_notes()
1045 defp maybe_preload_report_notes(query, _), do: query
1047 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1049 defp maybe_set_thread_muted_field(query, opts) do
1051 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1054 defp maybe_order(query, %{order: :desc}) do
1056 |> order_by(desc: :id)
1059 defp maybe_order(query, %{order: :asc}) do
1061 |> order_by(asc: :id)
1064 defp maybe_order(query, _), do: query
1066 defp fetch_activities_query_ap_ids_ops(opts) do
1067 source_user = opts[:muting_user]
1068 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1070 ap_id_relationships =
1071 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1072 [:block | ap_id_relationships]
1077 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1079 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1080 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1082 restrict_muted_reblogs_opts =
1083 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1085 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1088 def fetch_activities_query(recipients, opts \\ %{}) do
1089 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1090 fetch_activities_query_ap_ids_ops(opts)
1093 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1097 |> maybe_preload_objects(opts)
1098 |> maybe_preload_bookmarks(opts)
1099 |> maybe_preload_report_notes(opts)
1100 |> maybe_set_thread_muted_field(opts)
1101 |> maybe_order(opts)
1102 |> restrict_recipients(recipients, opts[:user])
1103 |> restrict_replies(opts)
1104 |> restrict_tag(opts)
1105 |> restrict_tag_reject(opts)
1106 |> restrict_tag_all(opts)
1107 |> restrict_since(opts)
1108 |> restrict_local(opts)
1109 |> restrict_actor(opts)
1110 |> restrict_type(opts)
1111 |> restrict_state(opts)
1112 |> restrict_favorited_by(opts)
1113 |> restrict_blocked(restrict_blocked_opts)
1114 |> restrict_muted(restrict_muted_opts)
1115 |> restrict_filtered(opts)
1116 |> restrict_media(opts)
1117 |> restrict_visibility(opts)
1118 |> restrict_thread_visibility(opts, config)
1119 |> restrict_reblogs(opts)
1120 |> restrict_pinned(opts)
1121 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1122 |> restrict_instance(opts)
1123 |> restrict_announce_object_actor(opts)
1124 |> restrict_filtered(opts)
1125 |> Activity.restrict_deactivated_users()
1126 |> exclude_poll_votes(opts)
1127 |> exclude_chat_messages(opts)
1128 |> exclude_invisible_actors(opts)
1129 |> exclude_visibility(opts)
1132 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1133 list_memberships = Pleroma.List.memberships(opts[:user])
1135 fetch_activities_query(recipients ++ list_memberships, opts)
1136 |> Pagination.fetch_paginated(opts, pagination)
1138 |> maybe_update_cc(list_memberships, opts[:user])
1142 Fetch favorites activities of user with order by sort adds to favorites
1144 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1145 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1147 |> Activity.Queries.by_actor()
1148 |> Activity.Queries.by_type("Like")
1149 |> Activity.with_joined_object()
1150 |> Object.with_joined_activity()
1151 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1152 |> order_by([like, _, _], desc_nulls_last: like.id)
1153 |> Pagination.fetch_paginated(
1154 Map.merge(params, %{skip_order: true}),
1159 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1160 Enum.map(activities, fn
1161 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1162 if Enum.any?(bcc, &(&1 in list_memberships)) do
1163 update_in(activity.data["cc"], &[user_ap_id | &1])
1173 defp maybe_update_cc(activities, _, _), do: activities
1175 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1176 from(activity in query,
1178 fragment("? && ?", activity.recipients, ^recipients) or
1179 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1180 ^Constants.as_public() in activity.recipients)
1184 def fetch_activities_bounded(
1186 recipients_with_public,
1188 pagination \\ :keyset
1190 fetch_activities_query([], opts)
1191 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1192 |> Pagination.fetch_paginated(opts, pagination)
1196 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1197 def upload(file, opts \\ []) do
1198 with {:ok, data} <- Upload.store(file, opts) do
1199 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1201 Repo.insert(%Object{data: obj_data})
1205 @spec get_actor_url(any()) :: binary() | nil
1206 defp get_actor_url(url) when is_binary(url), do: url
1207 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1209 defp get_actor_url(url) when is_list(url) do
1215 defp get_actor_url(_url), do: nil
1217 defp object_to_user_data(data) do
1219 data["icon"]["url"] &&
1222 "url" => [%{"href" => data["icon"]["url"]}]
1226 data["image"]["url"] &&
1229 "url" => [%{"href" => data["image"]["url"]}]
1234 |> Map.get("attachment", [])
1235 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1236 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1240 |> Map.get("tag", [])
1242 %{"type" => "Emoji"} -> true
1245 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1246 {String.trim(name, ":"), url}
1249 is_locked = data["manuallyApprovesFollowers"] || false
1250 capabilities = data["capabilities"] || %{}
1251 accepts_chat_messages = capabilities["acceptsChatMessages"]
1252 data = Transmogrifier.maybe_fix_user_object(data)
1253 is_discoverable = data["discoverable"] || false
1254 invisible = data["invisible"] || false
1255 actor_type = data["type"] || "Person"
1258 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1259 data["publicKey"]["publicKeyPem"]
1265 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1266 data["endpoints"]["sharedInbox"]
1273 uri: get_actor_url(data["url"]),
1278 is_locked: is_locked,
1279 is_discoverable: is_discoverable,
1280 invisible: invisible,
1283 follower_address: data["followers"],
1284 following_address: data["following"],
1285 bio: data["summary"] || "",
1286 actor_type: actor_type,
1287 also_known_as: Map.get(data, "alsoKnownAs", []),
1288 public_key: public_key,
1289 inbox: data["inbox"],
1290 shared_inbox: shared_inbox,
1291 accepts_chat_messages: accepts_chat_messages
1294 # nickname can be nil because of virtual actors
1295 if data["preferredUsername"] do
1299 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1302 Map.put(user_data, :nickname, nil)
1306 def fetch_follow_information_for_user(user) do
1307 with {:ok, following_data} <-
1308 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1309 {:ok, hide_follows} <- collection_private(following_data),
1310 {:ok, followers_data} <-
1311 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1312 {:ok, hide_followers} <- collection_private(followers_data) do
1315 hide_follows: hide_follows,
1316 follower_count: normalize_counter(followers_data["totalItems"]),
1317 following_count: normalize_counter(following_data["totalItems"]),
1318 hide_followers: hide_followers
1321 {:error, _} = e -> e
1326 defp normalize_counter(counter) when is_integer(counter), do: counter
1327 defp normalize_counter(_), do: 0
1329 def maybe_update_follow_information(user_data) do
1330 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1331 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1333 {:collections_available,
1334 !!(user_data[:following_address] && user_data[:follower_address])},
1336 fetch_follow_information_for_user(user_data) do
1337 info = Map.merge(user_data[:info] || %{}, info)
1340 |> Map.put(:info, info)
1342 {:user_type_check, false} ->
1345 {:collections_available, false} ->
1348 {:enabled, false} ->
1353 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1360 defp collection_private(%{"first" => %{"type" => type}})
1361 when type in ["CollectionPage", "OrderedCollectionPage"],
1364 defp collection_private(%{"first" => first}) do
1365 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1366 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1369 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1370 {:error, _} = e -> e
1375 defp collection_private(_data), do: {:ok, true}
1377 def user_data_from_user_object(data) do
1378 with {:ok, data} <- MRF.filter(data) do
1379 {:ok, object_to_user_data(data)}
1385 def fetch_and_prepare_user_from_ap_id(ap_id) do
1386 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1387 {:ok, data} <- user_data_from_user_object(data) do
1388 {:ok, maybe_update_follow_information(data)}
1390 # If this has been deleted, only log a debug and not an error
1391 {:error, "Object has been deleted" = e} ->
1392 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1395 {:error, {:reject, reason} = e} ->
1396 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1400 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1405 def maybe_handle_clashing_nickname(data) do
1406 with nickname when is_binary(nickname) <- data[:nickname],
1407 %User{} = old_user <- User.get_by_nickname(nickname),
1408 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1410 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1416 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1417 |> User.update_and_set_cache()
1419 {:ap_id_comparison, true} ->
1421 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1429 def make_user_from_ap_id(ap_id) do
1430 user = User.get_cached_by_ap_id(ap_id)
1432 if user && !User.ap_enabled?(user) do
1433 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1435 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1438 |> User.remote_user_changeset(data)
1439 |> User.update_and_set_cache()
1441 maybe_handle_clashing_nickname(data)
1444 |> User.remote_user_changeset()
1452 def make_user_from_nickname(nickname) do
1453 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1454 make_user_from_ap_id(ap_id)
1456 _e -> {:error, "No AP id in WebFinger"}
1460 # filter out broken threads
1461 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1462 entire_thread_visible_for_user?(activity, user)
1465 # do post-processing on a specific activity
1466 def contain_activity(%Activity{} = activity, %User{} = user) do
1467 contain_broken_threads(activity, user)
1470 def fetch_direct_messages_query do
1472 |> restrict_type(%{type: "Create"})
1473 |> restrict_visibility(%{visibility: "direct"})
1474 |> order_by([activity], asc: activity.id)