1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 # For Announce activities, we filter the recipients based on following status for any actors
36 # that match actual users. See issue #164 for more information about why this is necessary.
37 defp get_recipients(%{"type" => "Announce"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = User.get_cached_by_ap_id(data["actor"])
44 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
45 case User.get_cached_by_ap_id(recipient) do
47 user -> User.following?(user, actor)
54 defp get_recipients(%{"type" => "Create"} = data) do
55 to = Map.get(data, "to", [])
56 cc = Map.get(data, "cc", [])
57 bcc = Map.get(data, "bcc", [])
58 actor = Map.get(data, "actor", [])
59 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
63 defp get_recipients(data) do
64 to = Map.get(data, "to", [])
65 cc = Map.get(data, "cc", [])
66 bcc = Map.get(data, "bcc", [])
67 recipients = Enum.concat([to, cc, bcc])
71 defp check_actor_is_active(nil), do: true
73 defp check_actor_is_active(actor) when is_binary(actor) do
74 case User.get_cached_by_ap_id(actor) do
75 %User{deactivated: deactivated} -> not deactivated
80 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
81 limit = Config.get([:instance, :remote_limit])
82 String.length(content) <= limit
85 defp check_remote_limit(_), do: true
87 defp increase_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 def decrease_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 defp increase_replies_count_if_reply(%{
96 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 if is_public?(object) do
100 Object.increase_replies_count(reply_ap_id)
104 defp increase_replies_count_if_reply(_create_data), do: :noop
106 defp increase_poll_votes_if_vote(%{
107 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
111 Object.increase_vote_count(reply_ap_id, name, actor)
114 defp increase_poll_votes_if_vote(_create_data), do: :noop
116 @object_types ["ChatMessage"]
117 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
118 def persist(%{"type" => type} = object, meta) when type in @object_types do
119 with {:ok, object} <- Object.create(object) do
124 def persist(object, meta) do
125 with local <- Keyword.fetch!(meta, :local),
126 {recipients, _, _} <- get_recipients(object),
128 Repo.insert(%Activity{
131 recipients: recipients,
132 actor: object["actor"]
134 {:ok, activity, meta}
138 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
139 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
140 with nil <- Activity.normalize(map),
141 map <- lazy_put_activity_defaults(map, fake),
142 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
143 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
144 {:ok, map} <- MRF.filter(map),
145 {recipients, _, _} = get_recipients(map),
146 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
147 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
148 {:ok, map, object} <- insert_full_object(map) do
154 recipients: recipients
157 |> maybe_create_activity_expiration()
159 # Splice in the child object if we have one.
160 activity = Maps.put_if_present(activity, :object, object)
162 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
166 %Activity{} = activity ->
169 {:fake, true, map, recipients} ->
170 activity = %Activity{
174 recipients: recipients,
178 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
186 def notify_and_stream(activity) do
187 Notification.create_notifications(activity)
189 conversation = create_or_bump_conversation(activity, activity.actor)
190 participations = get_participations(conversation)
192 stream_out_participations(participations)
195 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
196 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
201 defp maybe_create_activity_expiration(result), do: result
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
211 defp get_participations({:ok, conversation}) do
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
217 defp get_participations(_), do: []
219 def stream_out_participations(participations) do
222 |> Repo.preload(:user)
224 Streamer.stream("participation", participations)
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
232 fetch_latest_activity_id_for_context(conversation.ap_id, %{
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
243 def stream_out_participations(_, _), do: :noop
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
252 def stream_out(_activity) do
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
272 %{to: to, actor: actor, published: published, context: context, object: object},
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 _ <- increase_poll_votes_if_vote(create_data),
280 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
281 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
282 _ <- notify_and_stream(activity),
283 :ok <- maybe_federate(activity) do
286 {:quick_insert, true, activity} ->
289 {:fake, true, activity} ->
293 Repo.rollback(message)
297 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
298 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
299 additional = params[:additional] || %{}
300 # only accept false as false value
301 local = !(params[:local] == false)
302 published = params[:published]
306 %{to: to, actor: actor, published: published, context: context, object: object},
310 with {:ok, activity} <- insert(listen_data, local),
311 _ <- notify_and_stream(activity),
312 :ok <- maybe_federate(activity) do
317 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
318 def accept(params) do
319 accept_or_reject("Accept", params)
322 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
323 def reject(params) do
324 accept_or_reject("Reject", params)
327 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
328 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
329 local = Map.get(params, :local, true)
330 activity_id = Map.get(params, :activity_id, nil)
333 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
334 |> Maps.put_if_present("id", activity_id)
336 with {:ok, activity} <- insert(data, local),
337 _ <- notify_and_stream(activity),
338 :ok <- maybe_federate(activity) do
343 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
344 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
345 local = !(params[:local] == false)
346 activity_id = params[:activity_id]
356 |> Maps.put_if_present("id", activity_id)
358 with {:ok, activity} <- insert(data, local),
359 _ <- notify_and_stream(activity),
360 :ok <- maybe_federate(activity) do
365 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
366 {:ok, Activity.t()} | {:error, any()}
367 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
368 with {:ok, result} <-
369 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
374 defp do_follow(follower, followed, activity_id, local, opts) do
375 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
376 data = make_follow_data(follower, followed, activity_id)
378 with {:ok, activity} <- insert(data, local),
379 _ <- skip_notify_and_stream || notify_and_stream(activity),
380 :ok <- maybe_federate(activity) do
383 {:error, error} -> Repo.rollback(error)
387 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
388 {:ok, Activity.t()} | nil | {:error, any()}
389 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
390 with {:ok, result} <-
391 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
396 defp do_unfollow(follower, followed, activity_id, local) do
397 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
398 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
399 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
400 {:ok, activity} <- insert(unfollow_data, local),
401 _ <- notify_and_stream(activity),
402 :ok <- maybe_federate(activity) do
406 {:error, error} -> Repo.rollback(error)
410 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
411 {:ok, Activity.t()} | {:error, any()}
412 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
413 with {:ok, result} <-
414 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
419 defp do_block(blocker, blocked, activity_id, local) do
420 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
422 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
423 unfollow(blocker, blocked, nil, local)
426 block_data = make_block_data(blocker, blocked, activity_id)
428 with {:ok, activity} <- insert(block_data, local),
429 _ <- notify_and_stream(activity),
430 :ok <- maybe_federate(activity) do
433 {:error, error} -> Repo.rollback(error)
437 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
447 # only accept false as false value
448 local = !(params[:local] == false)
449 forward = !(params[:forward] == false)
451 additional = params[:additional] || %{}
455 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
457 Map.merge(additional, %{"to" => [], "cc" => []})
460 with flag_data <- make_flag_data(params, additional),
461 {:ok, activity} <- insert(flag_data, local),
462 {:ok, stripped_activity} <- strip_report_status_data(activity),
463 _ <- notify_and_stream(activity),
464 :ok <- maybe_federate(stripped_activity) do
465 User.all_superusers()
466 |> Enum.filter(fn user -> not is_nil(user.email) end)
467 |> Enum.each(fn superuser ->
469 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
470 |> Pleroma.Emails.Mailer.deliver_async()
477 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
478 def move(%User{} = origin, %User{} = target, local \\ true) do
481 "actor" => origin.ap_id,
482 "object" => origin.ap_id,
483 "target" => target.ap_id
486 with true <- origin.ap_id in target.also_known_as,
487 {:ok, activity} <- insert(params, local),
488 _ <- notify_and_stream(activity) do
489 maybe_federate(activity)
491 BackgroundWorker.enqueue("move_following", %{
492 "origin_id" => origin.id,
493 "target_id" => target.id
498 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
503 def fetch_activities_for_context_query(context, opts) do
504 public = [Constants.as_public()]
508 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
511 from(activity in Activity)
512 |> maybe_preload_objects(opts)
513 |> maybe_preload_bookmarks(opts)
514 |> maybe_set_thread_muted_field(opts)
515 |> restrict_blocked(opts)
516 |> restrict_recipients(recipients, opts[:user])
520 "?->>'type' = ? and ?->>'context' = ?",
527 |> exclude_poll_votes(opts)
529 |> order_by([activity], desc: activity.id)
532 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
533 def fetch_activities_for_context(context, opts \\ %{}) do
535 |> fetch_activities_for_context_query(opts)
539 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
540 FlakeId.Ecto.CompatType.t() | nil
541 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
543 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
549 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
550 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
551 opts = Map.delete(opts, :user)
553 [Constants.as_public()]
554 |> fetch_activities_query(opts)
555 |> restrict_unlisted(opts)
556 |> Pagination.fetch_paginated(opts, pagination)
559 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
560 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
562 |> Map.put(:restrict_unlisted, true)
563 |> fetch_public_or_unlisted_activities(pagination)
566 @valid_visibilities ~w[direct unlisted public private]
568 defp restrict_visibility(query, %{visibility: visibility})
569 when is_list(visibility) do
570 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
575 "activity_visibility(?, ?, ?) = ANY (?)",
583 Logger.error("Could not restrict visibility to #{visibility}")
587 defp restrict_visibility(query, %{visibility: visibility})
588 when visibility in @valid_visibilities do
592 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
596 defp restrict_visibility(_query, %{visibility: visibility})
597 when visibility not in @valid_visibilities do
598 Logger.error("Could not restrict visibility to #{visibility}")
601 defp restrict_visibility(query, _visibility), do: query
603 defp exclude_visibility(query, %{exclude_visibilities: visibility})
604 when is_list(visibility) do
605 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
610 "activity_visibility(?, ?, ?) = ANY (?)",
618 Logger.error("Could not exclude visibility to #{visibility}")
623 defp exclude_visibility(query, %{exclude_visibilities: visibility})
624 when visibility in @valid_visibilities do
629 "activity_visibility(?, ?, ?) = ?",
638 defp exclude_visibility(query, %{exclude_visibilities: visibility})
639 when visibility not in [nil | @valid_visibilities] do
640 Logger.error("Could not exclude visibility to #{visibility}")
644 defp exclude_visibility(query, _visibility), do: query
646 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
649 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
652 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
655 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
659 defp restrict_thread_visibility(query, _, _), do: query
661 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
664 |> Map.put(:user, reading_user)
665 |> Map.put(:actor_id, user.ap_id)
668 godmode: params[:godmode],
669 reading_user: reading_user
671 |> user_activities_recipients()
672 |> fetch_activities(params)
676 def fetch_user_activities(user, reading_user, params \\ %{}) do
679 |> Map.put(:type, ["Create", "Announce"])
680 |> Map.put(:user, reading_user)
681 |> Map.put(:actor_id, user.ap_id)
682 |> Map.put(:pinned_activity_ids, user.pinned_activities)
685 if User.blocks?(reading_user, user) do
689 |> Map.put(:blocking_user, reading_user)
690 |> Map.put(:muting_user, reading_user)
694 godmode: params[:godmode],
695 reading_user: reading_user
697 |> user_activities_recipients()
698 |> fetch_activities(params)
702 def fetch_statuses(reading_user, params) do
703 params = Map.put(params, :type, ["Create", "Announce"])
706 godmode: params[:godmode],
707 reading_user: reading_user
709 |> user_activities_recipients()
710 |> fetch_activities(params, :offset)
714 defp user_activities_recipients(%{godmode: true}), do: []
716 defp user_activities_recipients(%{reading_user: reading_user}) do
718 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
720 [Constants.as_public()]
724 defp restrict_since(query, %{since_id: ""}), do: query
726 defp restrict_since(query, %{since_id: since_id}) do
727 from(activity in query, where: activity.id > ^since_id)
730 defp restrict_since(query, _), do: query
732 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
733 raise "Can't use the child object without preloading!"
736 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
738 [_activity, object] in query,
739 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
743 defp restrict_tag_reject(query, _), do: query
745 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
746 raise "Can't use the child object without preloading!"
749 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
751 [_activity, object] in query,
752 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
756 defp restrict_tag_all(query, _), do: query
758 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
759 raise "Can't use the child object without preloading!"
762 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
764 [_activity, object] in query,
765 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
769 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
771 [_activity, object] in query,
772 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
776 defp restrict_tag(query, _), do: query
778 defp restrict_recipients(query, [], _user), do: query
780 defp restrict_recipients(query, recipients, nil) do
781 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
784 defp restrict_recipients(query, recipients, user) do
787 where: fragment("? && ?", ^recipients, activity.recipients),
788 or_where: activity.actor == ^user.ap_id
792 defp restrict_local(query, %{local_only: true}) do
793 from(activity in query, where: activity.local == true)
796 defp restrict_local(query, _), do: query
798 defp restrict_actor(query, %{actor_id: actor_id}) do
799 from(activity in query, where: activity.actor == ^actor_id)
802 defp restrict_actor(query, _), do: query
804 defp restrict_type(query, %{type: type}) when is_binary(type) do
805 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
808 defp restrict_type(query, %{type: type}) do
809 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
812 defp restrict_type(query, _), do: query
814 defp restrict_state(query, %{state: state}) do
815 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
818 defp restrict_state(query, _), do: query
820 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
822 [_activity, object] in query,
823 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
827 defp restrict_favorited_by(query, _), do: query
829 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
830 raise "Can't use the child object without preloading!"
833 defp restrict_media(query, %{only_media: true}) do
835 [_activity, object] in query,
836 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
840 defp restrict_media(query, _), do: query
842 defp restrict_replies(query, %{exclude_replies: true}) do
844 [_activity, object] in query,
845 where: fragment("?->>'inReplyTo' is null", object.data)
849 defp restrict_replies(query, %{
850 reply_filtering_user: user,
851 reply_visibility: "self"
854 [activity, object] in query,
857 "?->>'inReplyTo' is null OR ? = ANY(?)",
865 defp restrict_replies(query, %{
866 reply_filtering_user: user,
867 reply_visibility: "following"
870 [activity, object] in query,
873 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
875 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
884 defp restrict_replies(query, _), do: query
886 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
887 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
890 defp restrict_reblogs(query, _), do: query
892 defp restrict_muted(query, %{with_muted: true}), do: query
894 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
895 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
898 from([activity] in query,
899 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
900 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
903 unless opts[:skip_preload] do
904 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
910 defp restrict_muted(query, _), do: query
912 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
913 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
914 domain_blocks = user.domain_blocks || []
916 following_ap_ids = User.get_friends_ap_ids(user)
919 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
922 [activity, object: o] in query,
923 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
924 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
927 "recipients_contain_blocked_domains(?, ?) = false",
933 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
940 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
948 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
957 defp restrict_blocked(query, _), do: query
959 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
964 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
966 ^[Constants.as_public()]
971 defp restrict_unlisted(query, _), do: query
973 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
974 from(activity in query, where: activity.id in ^ids)
977 defp restrict_pinned(query, _), do: query
979 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
980 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
986 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
994 defp restrict_muted_reblogs(query, _), do: query
996 defp restrict_instance(query, %{instance: instance}) do
1001 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1005 from(activity in query, where: activity.actor in ^users)
1008 defp restrict_instance(query, _), do: query
1010 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1012 defp exclude_poll_votes(query, _) do
1013 if has_named_binding?(query, :object) do
1014 from([activity, object: o] in query,
1015 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1022 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1024 defp exclude_chat_messages(query, _) do
1025 if has_named_binding?(query, :object) do
1026 from([activity, object: o] in query,
1027 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1034 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1036 defp exclude_invisible_actors(query, _opts) do
1038 User.Query.build(%{invisible: true, select: [:ap_id]})
1040 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1042 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1045 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1046 from(activity in query, where: activity.id != ^id)
1049 defp exclude_id(query, _), do: query
1051 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1053 defp maybe_preload_objects(query, _) do
1055 |> Activity.with_preloaded_object()
1058 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1060 defp maybe_preload_bookmarks(query, opts) do
1062 |> Activity.with_preloaded_bookmark(opts[:user])
1065 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1067 |> Activity.with_preloaded_report_notes()
1070 defp maybe_preload_report_notes(query, _), do: query
1072 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1074 defp maybe_set_thread_muted_field(query, opts) do
1076 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1079 defp maybe_order(query, %{order: :desc}) do
1081 |> order_by(desc: :id)
1084 defp maybe_order(query, %{order: :asc}) do
1086 |> order_by(asc: :id)
1089 defp maybe_order(query, _), do: query
1091 defp fetch_activities_query_ap_ids_ops(opts) do
1092 source_user = opts[:muting_user]
1093 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1095 ap_id_relationships =
1096 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1097 [:block | ap_id_relationships]
1102 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1104 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1105 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1107 restrict_muted_reblogs_opts =
1108 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1110 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1113 def fetch_activities_query(recipients, opts \\ %{}) do
1114 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1115 fetch_activities_query_ap_ids_ops(opts)
1118 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1122 |> maybe_preload_objects(opts)
1123 |> maybe_preload_bookmarks(opts)
1124 |> maybe_preload_report_notes(opts)
1125 |> maybe_set_thread_muted_field(opts)
1126 |> maybe_order(opts)
1127 |> restrict_recipients(recipients, opts[:user])
1128 |> restrict_replies(opts)
1129 |> restrict_tag(opts)
1130 |> restrict_tag_reject(opts)
1131 |> restrict_tag_all(opts)
1132 |> restrict_since(opts)
1133 |> restrict_local(opts)
1134 |> restrict_actor(opts)
1135 |> restrict_type(opts)
1136 |> restrict_state(opts)
1137 |> restrict_favorited_by(opts)
1138 |> restrict_blocked(restrict_blocked_opts)
1139 |> restrict_muted(restrict_muted_opts)
1140 |> restrict_media(opts)
1141 |> restrict_visibility(opts)
1142 |> restrict_thread_visibility(opts, config)
1143 |> restrict_reblogs(opts)
1144 |> restrict_pinned(opts)
1145 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1146 |> restrict_instance(opts)
1147 |> Activity.restrict_deactivated_users()
1148 |> exclude_poll_votes(opts)
1149 |> exclude_chat_messages(opts)
1150 |> exclude_invisible_actors(opts)
1151 |> exclude_visibility(opts)
1154 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1155 list_memberships = Pleroma.List.memberships(opts[:user])
1157 fetch_activities_query(recipients ++ list_memberships, opts)
1158 |> Pagination.fetch_paginated(opts, pagination)
1160 |> maybe_update_cc(list_memberships, opts[:user])
1164 Fetch favorites activities of user with order by sort adds to favorites
1166 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1167 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1169 |> Activity.Queries.by_actor()
1170 |> Activity.Queries.by_type("Like")
1171 |> Activity.with_joined_object()
1172 |> Object.with_joined_activity()
1173 |> select([_like, object, activity], %{activity | object: object})
1174 |> order_by([like, _, _], desc_nulls_last: like.id)
1175 |> Pagination.fetch_paginated(
1176 Map.merge(params, %{skip_order: true}),
1182 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1183 Enum.map(activities, fn
1184 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1185 if Enum.any?(bcc, &(&1 in list_memberships)) do
1186 update_in(activity.data["cc"], &[user_ap_id | &1])
1196 defp maybe_update_cc(activities, _, _), do: activities
1198 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1199 from(activity in query,
1201 fragment("? && ?", activity.recipients, ^recipients) or
1202 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1203 ^Constants.as_public() in activity.recipients)
1207 def fetch_activities_bounded(
1209 recipients_with_public,
1211 pagination \\ :keyset
1213 fetch_activities_query([], opts)
1214 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1215 |> Pagination.fetch_paginated(opts, pagination)
1219 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1220 def upload(file, opts \\ []) do
1221 with {:ok, data} <- Upload.store(file, opts) do
1222 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1224 Repo.insert(%Object{data: obj_data})
1228 @spec get_actor_url(any()) :: binary() | nil
1229 defp get_actor_url(url) when is_binary(url), do: url
1230 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1232 defp get_actor_url(url) when is_list(url) do
1238 defp get_actor_url(_url), do: nil
1240 defp object_to_user_data(data) do
1242 data["icon"]["url"] &&
1245 "url" => [%{"href" => data["icon"]["url"]}]
1249 data["image"]["url"] &&
1252 "url" => [%{"href" => data["image"]["url"]}]
1257 |> Map.get("attachment", [])
1258 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1259 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1263 |> Map.get("tag", [])
1265 %{"type" => "Emoji"} -> true
1268 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1269 {String.trim(name, ":"), url}
1272 locked = data["manuallyApprovesFollowers"] || false
1273 data = Transmogrifier.maybe_fix_user_object(data)
1274 discoverable = data["discoverable"] || false
1275 invisible = data["invisible"] || false
1276 actor_type = data["type"] || "Person"
1279 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1280 data["publicKey"]["publicKeyPem"]
1286 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1287 data["endpoints"]["sharedInbox"]
1294 uri: get_actor_url(data["url"]),
1300 discoverable: discoverable,
1301 invisible: invisible,
1304 follower_address: data["followers"],
1305 following_address: data["following"],
1306 bio: data["summary"],
1307 actor_type: actor_type,
1308 also_known_as: Map.get(data, "alsoKnownAs", []),
1309 public_key: public_key,
1310 inbox: data["inbox"],
1311 shared_inbox: shared_inbox
1314 # nickname can be nil because of virtual actors
1315 if data["preferredUsername"] do
1319 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1322 Map.put(user_data, :nickname, nil)
1326 def fetch_follow_information_for_user(user) do
1327 with {:ok, following_data} <-
1328 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1329 {:ok, hide_follows} <- collection_private(following_data),
1330 {:ok, followers_data} <-
1331 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1332 {:ok, hide_followers} <- collection_private(followers_data) do
1335 hide_follows: hide_follows,
1336 follower_count: normalize_counter(followers_data["totalItems"]),
1337 following_count: normalize_counter(following_data["totalItems"]),
1338 hide_followers: hide_followers
1341 {:error, _} = e -> e
1346 defp normalize_counter(counter) when is_integer(counter), do: counter
1347 defp normalize_counter(_), do: 0
1349 def maybe_update_follow_information(user_data) do
1350 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1351 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1353 {:collections_available,
1354 !!(user_data[:following_address] && user_data[:follower_address])},
1356 fetch_follow_information_for_user(user_data) do
1357 info = Map.merge(user_data[:info] || %{}, info)
1360 |> Map.put(:info, info)
1362 {:user_type_check, false} ->
1365 {:collections_available, false} ->
1368 {:enabled, false} ->
1373 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1380 defp collection_private(%{"first" => %{"type" => type}})
1381 when type in ["CollectionPage", "OrderedCollectionPage"],
1384 defp collection_private(%{"first" => first}) do
1385 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1386 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1389 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1390 {:error, _} = e -> e
1395 defp collection_private(_data), do: {:ok, true}
1397 def user_data_from_user_object(data) do
1398 with {:ok, data} <- MRF.filter(data) do
1399 {:ok, object_to_user_data(data)}
1405 def fetch_and_prepare_user_from_ap_id(ap_id) do
1406 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1407 {:ok, data} <- user_data_from_user_object(data) do
1408 {:ok, maybe_update_follow_information(data)}
1410 {:error, "Object has been deleted" = e} ->
1411 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1415 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1420 def make_user_from_ap_id(ap_id) do
1421 user = User.get_cached_by_ap_id(ap_id)
1423 if user && !User.ap_enabled?(user) do
1424 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1426 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1429 |> User.remote_user_changeset(data)
1430 |> User.update_and_set_cache()
1433 |> User.remote_user_changeset()
1441 def make_user_from_nickname(nickname) do
1442 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1443 make_user_from_ap_id(ap_id)
1445 _e -> {:error, "No AP id in WebFinger"}
1449 # filter out broken threads
1450 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1451 entire_thread_visible_for_user?(activity, user)
1454 # do post-processing on a specific activity
1455 def contain_activity(%Activity{} = activity, %User{} = user) do
1456 contain_broken_threads(activity, user)
1459 def fetch_direct_messages_query do
1461 |> restrict_type(%{type: "Create"})
1462 |> restrict_visibility(%{visibility: "direct"})
1463 |> order_by([activity], asc: activity.id)