1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
54 defp check_actor_is_active(nil), do: true
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
68 defp check_remote_limit(_), do: true
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
87 defp increase_replies_count_if_reply(_create_data), do: :noop
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
102 Repo.insert(%Activity{
105 recipients: recipients,
106 actor: object["actor"]
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
135 %Activity{} = activity ->
141 {:containment, _} = error ->
144 {:error, _} = error ->
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
152 recipients: recipients,
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
167 defp insert_activity_with_expiration(data, local, recipients) do
171 actor: data["actor"],
172 recipients: recipients
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
186 stream_out_participations(participations)
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
211 defp get_participations({:ok, conversation}) do
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
217 defp get_participations(_), do: []
219 def stream_out_participations(participations) do
222 |> Repo.preload(:user)
224 Streamer.stream("participation", participations)
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
243 def stream_out_participations(_, _), do: :noop
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
252 def stream_out(_activity) do
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
272 %{to: to, actor: actor, published: published, context: context, object: object},
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
285 {:quick_insert, true, activity} ->
288 {:fake, true, activity} ->
292 Repo.rollback(message)
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
305 %{to: to, actor: actor, published: published, context: context, object: object},
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
335 {:error, error} -> Repo.rollback(error)
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
359 additional = params[:additional] || %{}
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
365 Map.merge(additional, %{"to" => [], "cc" => []})
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
384 {:error, error} -> Repo.rollback(error)
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
432 "?->>'type' = ? and ?->>'context' = ?",
439 |> exclude_poll_votes(opts)
441 |> order_by([activity], desc: activity.id)
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
447 |> fetch_activities_for_context_query(opts)
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
479 @valid_visibilities ~w[direct unlisted public private]
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
488 "activity_visibility(?, ?, ?) = ANY (?)",
496 Logger.error("Could not restrict visibility to #{visibility}")
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
514 defp restrict_visibility(query, _visibility), do: query
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 "activity_visibility(?, ?, ?) = ANY (?)",
531 Logger.error("Could not exclude visibility to #{visibility}")
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
542 "activity_visibility(?, ?, ?) = ?",
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
557 defp exclude_visibility(query, _visibility), do: query
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
572 defp restrict_thread_visibility(query, _, _), do: query
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
581 godmode: params[:godmode],
582 reading_user: reading_user
584 |> user_activities_recipients()
585 |> fetch_activities(params)
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
598 if User.blocks?(reading_user, user) do
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
607 godmode: params[:godmode],
608 reading_user: reading_user
610 |> user_activities_recipients()
611 |> fetch_activities(params, :offset)
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
619 godmode: params[:godmode],
620 reading_user: reading_user
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
627 defp user_activities_recipients(%{godmode: true}), do: []
629 defp user_activities_recipients(%{reading_user: reading_user}) do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
633 [Constants.as_public()]
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
643 [activity, object] in query,
646 "?->>'type' != ? or ?->>'actor' != ?",
655 defp restrict_announce_object_actor(query, _), do: query
657 defp restrict_since(query, %{since_id: ""}), do: query
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
663 defp restrict_since(query, _), do: query
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise "Can't use the child object without preloading!"
669 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
676 defp restrict_tag_reject(query, _), do: query
678 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
679 raise "Can't use the child object without preloading!"
682 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
689 defp restrict_tag_all(query, _), do: query
691 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
692 raise "Can't use the child object without preloading!"
695 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
697 [_activity, object] in query,
698 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
702 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
709 defp restrict_tag(query, _), do: query
711 defp restrict_recipients(query, [], _user), do: query
713 defp restrict_recipients(query, recipients, nil) do
714 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
717 defp restrict_recipients(query, recipients, user) do
720 where: fragment("? && ?", ^recipients, activity.recipients),
721 or_where: activity.actor == ^user.ap_id
725 defp restrict_local(query, %{local_only: true}) do
726 from(activity in query, where: activity.local == true)
729 defp restrict_local(query, _), do: query
731 defp restrict_actor(query, %{actor_id: actor_id}) do
732 from(activity in query, where: activity.actor == ^actor_id)
735 defp restrict_actor(query, _), do: query
737 defp restrict_type(query, %{type: type}) when is_binary(type) do
738 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
741 defp restrict_type(query, %{type: type}) do
742 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
745 defp restrict_type(query, _), do: query
747 defp restrict_state(query, %{state: state}) do
748 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
751 defp restrict_state(query, _), do: query
753 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
755 [_activity, object] in query,
756 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
760 defp restrict_favorited_by(query, _), do: query
762 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
763 raise "Can't use the child object without preloading!"
766 defp restrict_media(query, %{only_media: true}) do
768 [activity, object] in query,
769 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
770 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
774 defp restrict_media(query, _), do: query
776 defp restrict_replies(query, %{exclude_replies: true}) do
778 [_activity, object] in query,
779 where: fragment("?->>'inReplyTo' is null", object.data)
783 defp restrict_replies(query, %{
784 reply_filtering_user: %User{} = user,
785 reply_visibility: "self"
788 [activity, object] in query,
791 "?->>'inReplyTo' is null OR ? = ANY(?)",
799 defp restrict_replies(query, %{
800 reply_filtering_user: %User{} = user,
801 reply_visibility: "following"
804 [activity, object] in query,
808 ?->>'type' != 'Create' -- This isn't a Create
809 OR ?->>'inReplyTo' is null -- this isn't a reply
810 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
811 -- unless they are the author (because authors
812 -- are also part of the recipients). This leads
813 -- to a bug that self-replies by friends won't
815 OR ? = ? -- The actor is us
819 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
828 defp restrict_replies(query, _), do: query
830 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
831 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
834 defp restrict_reblogs(query, _), do: query
836 defp restrict_muted(query, %{with_muted: true}), do: query
838 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
839 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
842 from([activity] in query,
843 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
846 "not (?->'to' \\?| ?) or ? = ?",
854 unless opts[:skip_preload] do
855 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
861 defp restrict_muted(query, _), do: query
863 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
864 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
865 domain_blocks = user.domain_blocks || []
867 following_ap_ids = User.get_friends_ap_ids(user)
870 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
873 [activity, object: o] in query,
874 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
877 "((not (? && ?)) or ? = ?)",
885 "recipients_contain_blocked_domains(?, ?) = false",
891 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
898 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
906 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
915 defp restrict_blocked(query, _), do: query
917 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
922 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
924 ^[Constants.as_public()]
929 defp restrict_unlisted(query, _), do: query
931 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
932 from(activity in query, where: activity.id in ^ids)
935 defp restrict_pinned(query, _), do: query
937 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
938 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
944 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
952 defp restrict_muted_reblogs(query, _), do: query
954 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
957 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
961 defp restrict_instance(query, _), do: query
963 defp restrict_filtered(query, %{user: %User{} = user}) do
964 case Filter.compose_regex(user) do
969 from([activity, object] in query,
971 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
972 activity.actor == ^user.ap_id
977 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
978 restrict_filtered(query, %{user: user})
981 defp restrict_filtered(query, _), do: query
983 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
985 defp exclude_poll_votes(query, _) do
986 if has_named_binding?(query, :object) do
987 from([activity, object: o] in query,
988 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
995 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
997 defp exclude_chat_messages(query, _) do
998 if has_named_binding?(query, :object) do
999 from([activity, object: o] in query,
1000 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1007 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1009 defp exclude_invisible_actors(query, _opts) do
1011 User.Query.build(%{invisible: true, select: [:ap_id]})
1013 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1015 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1018 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1019 from(activity in query, where: activity.id != ^id)
1022 defp exclude_id(query, _), do: query
1024 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1026 defp maybe_preload_objects(query, _) do
1028 |> Activity.with_preloaded_object()
1031 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1033 defp maybe_preload_bookmarks(query, opts) do
1035 |> Activity.with_preloaded_bookmark(opts[:user])
1038 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1040 |> Activity.with_preloaded_report_notes()
1043 defp maybe_preload_report_notes(query, _), do: query
1045 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1047 defp maybe_set_thread_muted_field(query, opts) do
1049 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1052 defp maybe_order(query, %{order: :desc}) do
1054 |> order_by(desc: :id)
1057 defp maybe_order(query, %{order: :asc}) do
1059 |> order_by(asc: :id)
1062 defp maybe_order(query, _), do: query
1064 defp fetch_activities_query_ap_ids_ops(opts) do
1065 source_user = opts[:muting_user]
1066 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1068 ap_id_relationships =
1069 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1070 [:block | ap_id_relationships]
1075 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1077 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1078 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1080 restrict_muted_reblogs_opts =
1081 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1083 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1086 def fetch_activities_query(recipients, opts \\ %{}) do
1087 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1088 fetch_activities_query_ap_ids_ops(opts)
1091 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1095 |> maybe_preload_objects(opts)
1096 |> maybe_preload_bookmarks(opts)
1097 |> maybe_preload_report_notes(opts)
1098 |> maybe_set_thread_muted_field(opts)
1099 |> maybe_order(opts)
1100 |> restrict_recipients(recipients, opts[:user])
1101 |> restrict_replies(opts)
1102 |> restrict_tag(opts)
1103 |> restrict_tag_reject(opts)
1104 |> restrict_tag_all(opts)
1105 |> restrict_since(opts)
1106 |> restrict_local(opts)
1107 |> restrict_actor(opts)
1108 |> restrict_type(opts)
1109 |> restrict_state(opts)
1110 |> restrict_favorited_by(opts)
1111 |> restrict_blocked(restrict_blocked_opts)
1112 |> restrict_muted(restrict_muted_opts)
1113 |> restrict_filtered(opts)
1114 |> restrict_media(opts)
1115 |> restrict_visibility(opts)
1116 |> restrict_thread_visibility(opts, config)
1117 |> restrict_reblogs(opts)
1118 |> restrict_pinned(opts)
1119 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1120 |> restrict_instance(opts)
1121 |> restrict_announce_object_actor(opts)
1122 |> restrict_filtered(opts)
1123 |> Activity.restrict_deactivated_users()
1124 |> exclude_poll_votes(opts)
1125 |> exclude_chat_messages(opts)
1126 |> exclude_invisible_actors(opts)
1127 |> exclude_visibility(opts)
1130 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1131 list_memberships = Pleroma.List.memberships(opts[:user])
1133 fetch_activities_query(recipients ++ list_memberships, opts)
1134 |> Pagination.fetch_paginated(opts, pagination)
1136 |> maybe_update_cc(list_memberships, opts[:user])
1140 Fetch favorites activities of user with order by sort adds to favorites
1142 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1143 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1145 |> Activity.Queries.by_actor()
1146 |> Activity.Queries.by_type("Like")
1147 |> Activity.with_joined_object()
1148 |> Object.with_joined_activity()
1149 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1150 |> order_by([like, _, _], desc_nulls_last: like.id)
1151 |> Pagination.fetch_paginated(
1152 Map.merge(params, %{skip_order: true}),
1157 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1158 Enum.map(activities, fn
1159 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1160 if Enum.any?(bcc, &(&1 in list_memberships)) do
1161 update_in(activity.data["cc"], &[user_ap_id | &1])
1171 defp maybe_update_cc(activities, _, _), do: activities
1173 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1174 from(activity in query,
1176 fragment("? && ?", activity.recipients, ^recipients) or
1177 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1178 ^Constants.as_public() in activity.recipients)
1182 def fetch_activities_bounded(
1184 recipients_with_public,
1186 pagination \\ :keyset
1188 fetch_activities_query([], opts)
1189 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1190 |> Pagination.fetch_paginated(opts, pagination)
1194 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1195 def upload(file, opts \\ []) do
1196 with {:ok, data} <- Upload.store(file, opts) do
1197 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1199 Repo.insert(%Object{data: obj_data})
1203 @spec get_actor_url(any()) :: binary() | nil
1204 defp get_actor_url(url) when is_binary(url), do: url
1205 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1207 defp get_actor_url(url) when is_list(url) do
1213 defp get_actor_url(_url), do: nil
1215 defp object_to_user_data(data) do
1217 data["icon"]["url"] &&
1220 "url" => [%{"href" => data["icon"]["url"]}]
1224 data["image"]["url"] &&
1227 "url" => [%{"href" => data["image"]["url"]}]
1232 |> Map.get("attachment", [])
1233 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1234 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1238 |> Map.get("tag", [])
1240 %{"type" => "Emoji"} -> true
1243 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1244 {String.trim(name, ":"), url}
1247 is_locked = data["manuallyApprovesFollowers"] || false
1248 capabilities = data["capabilities"] || %{}
1249 accepts_chat_messages = capabilities["acceptsChatMessages"]
1250 data = Transmogrifier.maybe_fix_user_object(data)
1251 is_discoverable = data["discoverable"] || false
1252 invisible = data["invisible"] || false
1253 actor_type = data["type"] || "Person"
1256 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1257 data["publicKey"]["publicKeyPem"]
1263 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1264 data["endpoints"]["sharedInbox"]
1271 uri: get_actor_url(data["url"]),
1276 is_locked: is_locked,
1277 is_discoverable: is_discoverable,
1278 invisible: invisible,
1281 follower_address: data["followers"],
1282 following_address: data["following"],
1283 bio: data["summary"] || "",
1284 actor_type: actor_type,
1285 also_known_as: Map.get(data, "alsoKnownAs", []),
1286 public_key: public_key,
1287 inbox: data["inbox"],
1288 shared_inbox: shared_inbox,
1289 accepts_chat_messages: accepts_chat_messages
1292 # nickname can be nil because of virtual actors
1293 if data["preferredUsername"] do
1297 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1300 Map.put(user_data, :nickname, nil)
1304 def fetch_follow_information_for_user(user) do
1305 with {:ok, following_data} <-
1306 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1307 {:ok, hide_follows} <- collection_private(following_data),
1308 {:ok, followers_data} <-
1309 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1310 {:ok, hide_followers} <- collection_private(followers_data) do
1313 hide_follows: hide_follows,
1314 follower_count: normalize_counter(followers_data["totalItems"]),
1315 following_count: normalize_counter(following_data["totalItems"]),
1316 hide_followers: hide_followers
1319 {:error, _} = e -> e
1324 defp normalize_counter(counter) when is_integer(counter), do: counter
1325 defp normalize_counter(_), do: 0
1327 def maybe_update_follow_information(user_data) do
1328 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1329 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1331 {:collections_available,
1332 !!(user_data[:following_address] && user_data[:follower_address])},
1334 fetch_follow_information_for_user(user_data) do
1335 info = Map.merge(user_data[:info] || %{}, info)
1338 |> Map.put(:info, info)
1340 {:user_type_check, false} ->
1343 {:collections_available, false} ->
1346 {:enabled, false} ->
1351 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1358 defp collection_private(%{"first" => %{"type" => type}})
1359 when type in ["CollectionPage", "OrderedCollectionPage"],
1362 defp collection_private(%{"first" => first}) do
1363 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1364 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1367 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1368 {:error, _} = e -> e
1373 defp collection_private(_data), do: {:ok, true}
1375 def user_data_from_user_object(data) do
1376 with {:ok, data} <- MRF.filter(data) do
1377 {:ok, object_to_user_data(data)}
1383 def fetch_and_prepare_user_from_ap_id(ap_id) do
1384 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1385 {:ok, data} <- user_data_from_user_object(data) do
1386 {:ok, maybe_update_follow_information(data)}
1388 # If this has been deleted, only log a debug and not an error
1389 {:error, "Object has been deleted" = e} ->
1390 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1393 {:error, {:reject, reason} = e} ->
1394 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1398 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1403 def maybe_handle_clashing_nickname(data) do
1404 with nickname when is_binary(nickname) <- data[:nickname],
1405 %User{} = old_user <- User.get_by_nickname(nickname),
1406 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1408 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1414 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1415 |> User.update_and_set_cache()
1417 {:ap_id_comparison, true} ->
1419 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1427 def make_user_from_ap_id(ap_id) do
1428 user = User.get_cached_by_ap_id(ap_id)
1430 if user && !User.ap_enabled?(user) do
1431 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1433 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1436 |> User.remote_user_changeset(data)
1437 |> User.update_and_set_cache()
1439 maybe_handle_clashing_nickname(data)
1442 |> User.remote_user_changeset()
1450 def make_user_from_nickname(nickname) do
1451 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1452 make_user_from_ap_id(ap_id)
1454 _e -> {:error, "No AP id in WebFinger"}
1458 # filter out broken threads
1459 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1460 entire_thread_visible_for_user?(activity, user)
1463 # do post-processing on a specific activity
1464 def contain_activity(%Activity{} = activity, %User{} = user) do
1465 contain_broken_threads(activity, user)
1468 def fetch_direct_messages_query do
1470 |> restrict_type(%{type: "Create"})
1471 |> restrict_visibility(%{visibility: "direct"})
1472 |> order_by([activity], asc: activity.id)