1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
54 defp check_actor_is_active(nil), do: true
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
68 defp check_remote_limit(_), do: true
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
87 defp increase_replies_count_if_reply(_create_data), do: :noop
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
102 Repo.insert(%Activity{
105 recipients: recipients,
106 actor: object["actor"]
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
135 %Activity{} = activity ->
141 {:containment, _} = error ->
144 {:error, _} = error ->
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
152 recipients: recipients,
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
167 defp insert_activity_with_expiration(data, local, recipients) do
171 actor: data["actor"],
172 recipients: recipients
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
186 stream_out_participations(participations)
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
211 defp get_participations({:ok, conversation}) do
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
217 defp get_participations(_), do: []
219 def stream_out_participations(participations) do
222 |> Repo.preload(:user)
224 Streamer.stream("participation", participations)
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
243 def stream_out_participations(_, _), do: :noop
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
252 def stream_out(_activity) do
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
272 %{to: to, actor: actor, published: published, context: context, object: object},
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
285 {:quick_insert, true, activity} ->
288 {:fake, true, activity} ->
292 Repo.rollback(message)
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
305 %{to: to, actor: actor, published: published, context: context, object: object},
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
335 {:error, error} -> Repo.rollback(error)
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
359 additional = params[:additional] || %{}
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
365 Map.merge(additional, %{"to" => [], "cc" => []})
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
384 {:error, error} -> Repo.rollback(error)
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
432 "?->>'type' = ? and ?->>'context' = ?",
439 |> exclude_poll_votes(opts)
441 |> order_by([activity], desc: activity.id)
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
447 |> fetch_activities_for_context_query(opts)
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
479 @valid_visibilities ~w[direct unlisted public private]
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
488 "activity_visibility(?, ?, ?) = ANY (?)",
496 Logger.error("Could not restrict visibility to #{visibility}")
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
514 defp restrict_visibility(query, _visibility), do: query
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 "activity_visibility(?, ?, ?) = ANY (?)",
531 Logger.error("Could not exclude visibility to #{visibility}")
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
542 "activity_visibility(?, ?, ?) = ?",
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
557 defp exclude_visibility(query, _visibility), do: query
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
572 defp restrict_thread_visibility(query, _, _), do: query
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
581 godmode: params[:godmode],
582 reading_user: reading_user
584 |> user_activities_recipients()
585 |> fetch_activities(params)
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
598 if User.blocks?(reading_user, user) do
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
608 !Map.has_key?(params, :offset) -> :keyset
613 godmode: params[:godmode],
614 reading_user: reading_user
616 |> user_activities_recipients()
617 |> fetch_activities(params, pagination_type)
621 def fetch_statuses(reading_user, params) do
622 params = Map.put(params, :type, ["Create", "Announce"])
625 godmode: params[:godmode],
626 reading_user: reading_user
628 |> user_activities_recipients()
629 |> fetch_activities(params, :offset)
633 defp user_activities_recipients(%{godmode: true}), do: []
635 defp user_activities_recipients(%{reading_user: reading_user}) do
637 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 [Constants.as_public()]
643 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
644 raise "Can't use the child object without preloading!"
647 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 [activity, object] in query,
652 "?->>'type' != ? or ?->>'actor' != ?",
661 defp restrict_announce_object_actor(query, _), do: query
663 defp restrict_since(query, %{since_id: ""}), do: query
665 defp restrict_since(query, %{since_id: since_id}) do
666 from(activity in query, where: activity.id > ^since_id)
669 defp restrict_since(query, _), do: query
671 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
672 raise "Can't use the child object without preloading!"
675 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
677 [_activity, object] in query,
678 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
682 defp restrict_tag_reject(query, _), do: query
684 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
685 raise "Can't use the child object without preloading!"
688 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
690 [_activity, object] in query,
691 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
695 defp restrict_tag_all(query, _), do: query
697 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
698 raise "Can't use the child object without preloading!"
701 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
703 [_activity, object] in query,
704 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
708 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
710 [_activity, object] in query,
711 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
715 defp restrict_tag(query, _), do: query
717 defp restrict_recipients(query, [], _user), do: query
719 defp restrict_recipients(query, recipients, nil) do
720 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
723 defp restrict_recipients(query, recipients, user) do
726 where: fragment("? && ?", ^recipients, activity.recipients),
727 or_where: activity.actor == ^user.ap_id
731 defp restrict_local(query, %{local_only: true}) do
732 from(activity in query, where: activity.local == true)
735 defp restrict_local(query, _), do: query
737 defp restrict_actor(query, %{actor_id: actor_id}) do
738 from(activity in query, where: activity.actor == ^actor_id)
741 defp restrict_actor(query, _), do: query
743 defp restrict_type(query, %{type: type}) when is_binary(type) do
744 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
747 defp restrict_type(query, %{type: type}) do
748 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
751 defp restrict_type(query, _), do: query
753 defp restrict_state(query, %{state: state}) do
754 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
757 defp restrict_state(query, _), do: query
759 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
761 [_activity, object] in query,
762 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
766 defp restrict_favorited_by(query, _), do: query
768 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
769 raise "Can't use the child object without preloading!"
772 defp restrict_media(query, %{only_media: true}) do
774 [activity, object] in query,
775 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
776 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
780 defp restrict_media(query, _), do: query
782 defp restrict_replies(query, %{exclude_replies: true}) do
784 [_activity, object] in query,
785 where: fragment("?->>'inReplyTo' is null", object.data)
789 defp restrict_replies(query, %{
790 reply_filtering_user: %User{} = user,
791 reply_visibility: "self"
794 [activity, object] in query,
797 "?->>'inReplyTo' is null OR ? = ANY(?)",
805 defp restrict_replies(query, %{
806 reply_filtering_user: %User{} = user,
807 reply_visibility: "following"
810 [activity, object] in query,
814 ?->>'type' != 'Create' -- This isn't a Create
815 OR ?->>'inReplyTo' is null -- this isn't a reply
816 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
817 -- unless they are the author (because authors
818 -- are also part of the recipients). This leads
819 -- to a bug that self-replies by friends won't
821 OR ? = ? -- The actor is us
825 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
834 defp restrict_replies(query, _), do: query
836 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
837 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
840 defp restrict_reblogs(query, _), do: query
842 defp restrict_muted(query, %{with_muted: true}), do: query
844 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
845 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
848 from([activity] in query,
849 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
852 "not (?->'to' \\?| ?) or ? = ?",
860 unless opts[:skip_preload] do
861 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
867 defp restrict_muted(query, _), do: query
869 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
870 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
871 domain_blocks = user.domain_blocks || []
873 following_ap_ids = User.get_friends_ap_ids(user)
876 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
879 [activity, object: o] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
883 "((not (? && ?)) or ? = ?)",
891 "recipients_contain_blocked_domains(?, ?) = false",
897 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
904 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
912 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
921 defp restrict_blocked(query, _), do: query
923 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
928 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
930 ^[Constants.as_public()]
935 defp restrict_unlisted(query, _), do: query
937 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
938 from(activity in query, where: activity.id in ^ids)
941 defp restrict_pinned(query, _), do: query
943 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
944 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
950 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
958 defp restrict_muted_reblogs(query, _), do: query
960 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
963 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
967 defp restrict_instance(query, _), do: query
969 defp restrict_filtered(query, %{user: %User{} = user}) do
970 case Filter.compose_regex(user) do
975 from([activity, object] in query,
977 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
978 activity.actor == ^user.ap_id
983 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
984 restrict_filtered(query, %{user: user})
987 defp restrict_filtered(query, _), do: query
989 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
991 defp exclude_poll_votes(query, _) do
992 if has_named_binding?(query, :object) do
993 from([activity, object: o] in query,
994 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1001 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1003 defp exclude_chat_messages(query, _) do
1004 if has_named_binding?(query, :object) do
1005 from([activity, object: o] in query,
1006 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1013 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1015 defp exclude_invisible_actors(query, _opts) do
1017 User.Query.build(%{invisible: true, select: [:ap_id]})
1019 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1021 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1024 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1025 from(activity in query, where: activity.id != ^id)
1028 defp exclude_id(query, _), do: query
1030 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1032 defp maybe_preload_objects(query, _) do
1034 |> Activity.with_preloaded_object()
1037 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1039 defp maybe_preload_bookmarks(query, opts) do
1041 |> Activity.with_preloaded_bookmark(opts[:user])
1044 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1046 |> Activity.with_preloaded_report_notes()
1049 defp maybe_preload_report_notes(query, _), do: query
1051 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1053 defp maybe_set_thread_muted_field(query, opts) do
1055 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1058 defp maybe_order(query, %{order: :desc}) do
1060 |> order_by(desc: :id)
1063 defp maybe_order(query, %{order: :asc}) do
1065 |> order_by(asc: :id)
1068 defp maybe_order(query, _), do: query
1070 defp fetch_activities_query_ap_ids_ops(opts) do
1071 source_user = opts[:muting_user]
1072 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1074 ap_id_relationships =
1075 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1076 [:block | ap_id_relationships]
1081 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1083 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1084 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1086 restrict_muted_reblogs_opts =
1087 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1089 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1092 def fetch_activities_query(recipients, opts \\ %{}) do
1093 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1094 fetch_activities_query_ap_ids_ops(opts)
1097 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1101 |> maybe_preload_objects(opts)
1102 |> maybe_preload_bookmarks(opts)
1103 |> maybe_preload_report_notes(opts)
1104 |> maybe_set_thread_muted_field(opts)
1105 |> maybe_order(opts)
1106 |> restrict_recipients(recipients, opts[:user])
1107 |> restrict_replies(opts)
1108 |> restrict_tag(opts)
1109 |> restrict_tag_reject(opts)
1110 |> restrict_tag_all(opts)
1111 |> restrict_since(opts)
1112 |> restrict_local(opts)
1113 |> restrict_actor(opts)
1114 |> restrict_type(opts)
1115 |> restrict_state(opts)
1116 |> restrict_favorited_by(opts)
1117 |> restrict_blocked(restrict_blocked_opts)
1118 |> restrict_muted(restrict_muted_opts)
1119 |> restrict_filtered(opts)
1120 |> restrict_media(opts)
1121 |> restrict_visibility(opts)
1122 |> restrict_thread_visibility(opts, config)
1123 |> restrict_reblogs(opts)
1124 |> restrict_pinned(opts)
1125 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1126 |> restrict_instance(opts)
1127 |> restrict_announce_object_actor(opts)
1128 |> restrict_filtered(opts)
1129 |> Activity.restrict_deactivated_users()
1130 |> exclude_poll_votes(opts)
1131 |> exclude_chat_messages(opts)
1132 |> exclude_invisible_actors(opts)
1133 |> exclude_visibility(opts)
1136 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1137 list_memberships = Pleroma.List.memberships(opts[:user])
1139 fetch_activities_query(recipients ++ list_memberships, opts)
1140 |> Pagination.fetch_paginated(opts, pagination)
1142 |> maybe_update_cc(list_memberships, opts[:user])
1146 Fetch favorites activities of user with order by sort adds to favorites
1148 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1149 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1151 |> Activity.Queries.by_actor()
1152 |> Activity.Queries.by_type("Like")
1153 |> Activity.with_joined_object()
1154 |> Object.with_joined_activity()
1155 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1156 |> order_by([like, _, _], desc_nulls_last: like.id)
1157 |> Pagination.fetch_paginated(
1158 Map.merge(params, %{skip_order: true}),
1163 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1164 Enum.map(activities, fn
1165 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1166 if Enum.any?(bcc, &(&1 in list_memberships)) do
1167 update_in(activity.data["cc"], &[user_ap_id | &1])
1177 defp maybe_update_cc(activities, _, _), do: activities
1179 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1180 from(activity in query,
1182 fragment("? && ?", activity.recipients, ^recipients) or
1183 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1184 ^Constants.as_public() in activity.recipients)
1188 def fetch_activities_bounded(
1190 recipients_with_public,
1192 pagination \\ :keyset
1194 fetch_activities_query([], opts)
1195 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1196 |> Pagination.fetch_paginated(opts, pagination)
1200 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1201 def upload(file, opts \\ []) do
1202 with {:ok, data} <- Upload.store(file, opts) do
1203 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1205 Repo.insert(%Object{data: obj_data})
1209 @spec get_actor_url(any()) :: binary() | nil
1210 defp get_actor_url(url) when is_binary(url), do: url
1211 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1213 defp get_actor_url(url) when is_list(url) do
1219 defp get_actor_url(_url), do: nil
1221 defp object_to_user_data(data) do
1223 data["icon"]["url"] &&
1226 "url" => [%{"href" => data["icon"]["url"]}]
1230 data["image"]["url"] &&
1233 "url" => [%{"href" => data["image"]["url"]}]
1238 |> Map.get("attachment", [])
1239 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1240 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1244 |> Map.get("tag", [])
1246 %{"type" => "Emoji"} -> true
1249 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1250 {String.trim(name, ":"), url}
1253 is_locked = data["manuallyApprovesFollowers"] || false
1254 capabilities = data["capabilities"] || %{}
1255 accepts_chat_messages = capabilities["acceptsChatMessages"]
1256 data = Transmogrifier.maybe_fix_user_object(data)
1257 is_discoverable = data["discoverable"] || false
1258 invisible = data["invisible"] || false
1259 actor_type = data["type"] || "Person"
1262 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1263 data["publicKey"]["publicKeyPem"]
1269 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1270 data["endpoints"]["sharedInbox"]
1277 uri: get_actor_url(data["url"]),
1282 is_locked: is_locked,
1283 is_discoverable: is_discoverable,
1284 invisible: invisible,
1287 follower_address: data["followers"],
1288 following_address: data["following"],
1289 bio: data["summary"] || "",
1290 actor_type: actor_type,
1291 also_known_as: Map.get(data, "alsoKnownAs", []),
1292 public_key: public_key,
1293 inbox: data["inbox"],
1294 shared_inbox: shared_inbox,
1295 accepts_chat_messages: accepts_chat_messages
1298 # nickname can be nil because of virtual actors
1299 if data["preferredUsername"] do
1303 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1306 Map.put(user_data, :nickname, nil)
1310 def fetch_follow_information_for_user(user) do
1311 with {:ok, following_data} <-
1312 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1313 {:ok, hide_follows} <- collection_private(following_data),
1314 {:ok, followers_data} <-
1315 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1316 {:ok, hide_followers} <- collection_private(followers_data) do
1319 hide_follows: hide_follows,
1320 follower_count: normalize_counter(followers_data["totalItems"]),
1321 following_count: normalize_counter(following_data["totalItems"]),
1322 hide_followers: hide_followers
1325 {:error, _} = e -> e
1330 defp normalize_counter(counter) when is_integer(counter), do: counter
1331 defp normalize_counter(_), do: 0
1333 def maybe_update_follow_information(user_data) do
1334 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1335 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1337 {:collections_available,
1338 !!(user_data[:following_address] && user_data[:follower_address])},
1340 fetch_follow_information_for_user(user_data) do
1341 info = Map.merge(user_data[:info] || %{}, info)
1344 |> Map.put(:info, info)
1346 {:user_type_check, false} ->
1349 {:collections_available, false} ->
1352 {:enabled, false} ->
1357 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1364 defp collection_private(%{"first" => %{"type" => type}})
1365 when type in ["CollectionPage", "OrderedCollectionPage"],
1368 defp collection_private(%{"first" => first}) do
1369 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1370 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1373 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1374 {:error, _} = e -> e
1379 defp collection_private(_data), do: {:ok, true}
1381 def user_data_from_user_object(data) do
1382 with {:ok, data} <- MRF.filter(data) do
1383 {:ok, object_to_user_data(data)}
1389 def fetch_and_prepare_user_from_ap_id(ap_id) do
1390 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1391 {:ok, data} <- user_data_from_user_object(data) do
1392 {:ok, maybe_update_follow_information(data)}
1394 # If this has been deleted, only log a debug and not an error
1395 {:error, "Object has been deleted" = e} ->
1396 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1399 {:error, {:reject, reason} = e} ->
1400 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1404 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1409 def maybe_handle_clashing_nickname(data) do
1410 with nickname when is_binary(nickname) <- data[:nickname],
1411 %User{} = old_user <- User.get_by_nickname(nickname),
1412 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1414 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1420 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1421 |> User.update_and_set_cache()
1423 {:ap_id_comparison, true} ->
1425 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1433 def make_user_from_ap_id(ap_id) do
1434 user = User.get_cached_by_ap_id(ap_id)
1436 if user && !User.ap_enabled?(user) do
1437 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1439 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1442 |> User.remote_user_changeset(data)
1443 |> User.update_and_set_cache()
1445 maybe_handle_clashing_nickname(data)
1448 |> User.remote_user_changeset()
1456 def make_user_from_nickname(nickname) do
1457 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1458 make_user_from_ap_id(ap_id)
1460 _e -> {:error, "No AP id in WebFinger"}
1464 # filter out broken threads
1465 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1466 entire_thread_visible_for_user?(activity, user)
1469 # do post-processing on a specific activity
1470 def contain_activity(%Activity{} = activity, %User{} = user) do
1471 contain_broken_threads(activity, user)
1474 def fetch_direct_messages_query do
1476 |> restrict_type(%{type: "Create"})
1477 |> restrict_visibility(%{visibility: "direct"})
1478 |> order_by([activity], asc: activity.id)