1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
14 alias Pleroma.Object.Containment
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.Pagination
20 alias Pleroma.Web.ActivityPub.MRF
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.Utils
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
32 require Pleroma.Constants
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
46 user -> User.following?(user, actor)
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
88 defp check_remote_limit(_), do: true
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
107 def increase_replies_count_if_reply(_create_data), do: :noop
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
117 def decrease_replies_count_if_reply(_object), do: :noop
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
124 Object.increase_vote_count(reply_ap_id, name, actor)
127 def increase_poll_votes_if_vote(_create_data), do: :noop
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
134 Repo.insert(%Activity{
137 recipients: recipients,
138 actor: object["actor"]
140 {:ok, activity, meta}
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
156 Repo.insert(%Activity{
160 recipients: recipients
163 # Splice in the child object if we have one.
165 if not is_nil(object) do
166 Map.put(activity, :object, object)
171 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
173 Notification.create_notifications(activity)
175 conversation = create_or_bump_conversation(activity, map["actor"])
176 participations = get_participations(conversation)
178 stream_out_participations(participations)
181 %Activity{} = activity ->
184 {:fake, true, map, recipients} ->
185 activity = %Activity{
189 recipients: recipients,
193 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
201 defp create_or_bump_conversation(activity, actor) do
202 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
203 %User{} = user <- User.get_cached_by_ap_id(actor),
204 Participation.mark_as_read(user, conversation) do
209 defp get_participations({:ok, conversation}) do
211 |> Repo.preload(:participations, force: true)
212 |> Map.get(:participations)
215 defp get_participations(_), do: []
217 def stream_out_participations(participations) do
220 |> Repo.preload(:user)
222 Streamer.stream("participation", participations)
225 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
226 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
227 conversation = Repo.preload(conversation, :participations),
229 fetch_latest_activity_id_for_context(conversation.ap_id, %{
231 "blocking_user" => user
233 if last_activity_id do
234 stream_out_participations(conversation.participations)
239 def stream_out_participations(_, _), do: :noop
241 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
242 when data_type in ["Create", "Announce", "Delete"] do
244 |> Topics.get_activity_topics()
245 |> Streamer.stream(activity)
248 def stream_out(_activity) do
252 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
253 def create(params, fake \\ false) do
254 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
259 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
260 additional = params[:additional] || %{}
261 # only accept false as false value
262 local = !(params[:local] == false)
263 published = params[:published]
264 quick_insert? = Config.get([:env]) == :benchmark
268 %{to: to, actor: actor, published: published, context: context, object: object},
271 {:ok, activity} <- insert(create_data, local, fake),
272 {:fake, false, activity} <- {:fake, fake, activity},
273 _ <- increase_replies_count_if_reply(create_data),
274 _ <- increase_poll_votes_if_vote(create_data),
275 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
276 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
277 :ok <- maybe_federate(activity) do
280 {:quick_insert, true, activity} ->
283 {:fake, true, activity} ->
287 Repo.rollback(message)
291 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
292 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
293 additional = params[:additional] || %{}
294 # only accept false as false value
295 local = !(params[:local] == false)
296 published = params[:published]
300 %{to: to, actor: actor, published: published, context: context, object: object},
303 {:ok, activity} <- insert(listen_data, local),
304 :ok <- maybe_federate(activity) do
309 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
310 def accept(params) do
311 accept_or_reject("Accept", params)
314 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def reject(params) do
316 accept_or_reject("Reject", params)
319 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
320 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
321 local = Map.get(params, :local, true)
322 activity_id = Map.get(params, :activity_id, nil)
325 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
326 |> Utils.maybe_put("id", activity_id),
327 {:ok, activity} <- insert(data, local),
328 :ok <- maybe_federate(activity) do
333 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
334 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
335 local = !(params[:local] == false)
336 activity_id = params[:activity_id]
345 data <- Utils.maybe_put(data, "id", activity_id),
346 {:ok, activity} <- insert(data, local),
347 :ok <- maybe_federate(activity) do
352 @spec react_with_emoji(User.t(), Object.t(), String.t(), keyword()) ::
353 {:ok, Activity.t(), Object.t()} | {:error, any()}
354 def react_with_emoji(user, object, emoji, options \\ []) do
355 with {:ok, result} <-
356 Repo.transaction(fn -> do_react_with_emoji(user, object, emoji, options) end) do
361 defp do_react_with_emoji(user, object, emoji, options) do
362 with local <- Keyword.get(options, :local, true),
363 activity_id <- Keyword.get(options, :activity_id, nil),
364 true <- Pleroma.Emoji.is_unicode_emoji?(emoji),
365 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
366 {:ok, activity} <- insert(reaction_data, local),
367 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
368 :ok <- maybe_federate(activity) do
369 {:ok, activity, object}
371 false -> {:error, false}
372 {:error, error} -> Repo.rollback(error)
376 @spec unreact_with_emoji(User.t(), String.t(), keyword()) ::
377 {:ok, Activity.t(), Object.t()} | {:error, any()}
378 def unreact_with_emoji(user, reaction_id, options \\ []) do
379 with {:ok, result} <-
380 Repo.transaction(fn -> do_unreact_with_emoji(user, reaction_id, options) end) do
385 defp do_unreact_with_emoji(user, reaction_id, options) do
386 with local <- Keyword.get(options, :local, true),
387 activity_id <- Keyword.get(options, :activity_id, nil),
388 user_ap_id <- user.ap_id,
389 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
390 object <- Object.normalize(reaction_activity),
391 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
392 {:ok, activity} <- insert(unreact_data, local),
393 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
394 :ok <- maybe_federate(activity) do
395 {:ok, activity, object}
397 {:error, error} -> Repo.rollback(error)
401 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
402 {:ok, Activity.t(), Object.t()} | {:error, any()}
404 %User{ap_id: _} = user,
405 %Object{data: %{"id" => _}} = object,
410 with {:ok, result} <-
411 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
416 defp do_announce(user, object, activity_id, local, public) do
417 with true <- is_announceable?(object, user, public),
418 object <- Object.get_by_id(object.id),
419 announce_data <- make_announce_data(user, object, activity_id, public),
420 {:ok, activity} <- insert(announce_data, local),
421 {:ok, object} <- add_announce_to_object(activity, object),
422 :ok <- maybe_federate(activity) do
423 {:ok, activity, object}
425 false -> {:error, false}
426 {:error, error} -> Repo.rollback(error)
430 @spec unannounce(User.t(), Object.t(), String.t() | nil, boolean()) ::
431 {:ok, Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
438 with {:ok, result} <-
439 Repo.transaction(fn -> do_unannounce(actor, object, activity_id, local) end) do
444 defp do_unannounce(actor, object, activity_id, local) do
445 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
446 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
447 {:ok, unannounce_activity} <- insert(unannounce_data, local),
448 :ok <- maybe_federate(unannounce_activity),
449 {:ok, _activity} <- Repo.delete(announce_activity),
450 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
451 {:ok, unannounce_activity, object}
454 {:error, error} -> Repo.rollback(error)
458 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
459 {:ok, Activity.t()} | {:error, any()}
460 def follow(follower, followed, activity_id \\ nil, local \\ true) do
461 with {:ok, result} <-
462 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
467 defp do_follow(follower, followed, activity_id, local) do
468 with data <- make_follow_data(follower, followed, activity_id),
469 {:ok, activity} <- insert(data, local),
470 :ok <- maybe_federate(activity) do
473 {:error, error} -> Repo.rollback(error)
477 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
478 {:ok, Activity.t()} | nil | {:error, any()}
479 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
480 with {:ok, result} <-
481 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
486 defp do_unfollow(follower, followed, activity_id, local) do
487 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
488 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
489 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
490 {:ok, activity} <- insert(unfollow_data, local),
491 :ok <- maybe_federate(activity) do
495 {:error, error} -> Repo.rollback(error)
499 @spec delete(User.t() | Object.t(), keyword()) :: {:ok, User.t() | Object.t()} | {:error, any()}
500 def delete(entity, options \\ []) do
501 with {:ok, result} <- Repo.transaction(fn -> do_delete(entity, options) end) do
506 defp do_delete(%User{ap_id: ap_id, follower_address: follower_address} = user, _) do
508 "to" => [follower_address],
511 "object" => %{"type" => "Person", "id" => ap_id}
513 {:ok, activity} <- insert(data, true, true, true),
514 :ok <- maybe_federate(activity) do
519 defp do_delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options) do
520 local = Keyword.get(options, :local, true)
521 activity_id = Keyword.get(options, :activity_id, nil)
522 actor = Keyword.get(options, :actor, actor)
524 user = User.get_cached_by_ap_id(actor)
525 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
527 with create_activity <- Activity.get_create_by_object_ap_id(id),
534 "deleted_activity_id" => create_activity && create_activity.id
536 |> maybe_put("id", activity_id),
537 {:ok, activity} <- insert(data, local, false),
538 {:ok, object, _create_activity} <- Object.delete(object),
539 stream_out_participations(object, user),
540 _ <- decrease_replies_count_if_reply(object),
541 {:ok, _actor} <- decrease_note_count_if_public(user, object),
542 :ok <- maybe_federate(activity) do
550 defp do_delete(%Object{data: %{"type" => "Tombstone", "id" => ap_id}}, _) do
553 |> Activity.Queries.by_object_id()
554 |> Activity.Queries.by_type("Delete")
560 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
561 {:ok, Activity.t()} | {:error, any()}
562 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
563 with {:ok, result} <-
564 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
569 defp do_block(blocker, blocked, activity_id, local) do
570 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
571 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
573 if unfollow_blocked do
574 follow_activity = fetch_latest_follow(blocker, blocked)
575 if follow_activity, do: unfollow(blocker, blocked, nil, local)
578 with true <- outgoing_blocks,
579 block_data <- make_block_data(blocker, blocked, activity_id),
580 {:ok, activity} <- insert(block_data, local),
581 :ok <- maybe_federate(activity) do
584 {:error, error} -> Repo.rollback(error)
588 @spec unblock(User.t(), User.t(), String.t() | nil, boolean()) ::
589 {:ok, Activity.t()} | {:error, any()} | nil
590 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
591 with {:ok, result} <-
592 Repo.transaction(fn -> do_unblock(blocker, blocked, activity_id, local) end) do
597 defp do_unblock(blocker, blocked, activity_id, local) do
598 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
599 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
600 {:ok, activity} <- insert(unblock_data, local),
601 :ok <- maybe_federate(activity) do
605 {:error, error} -> Repo.rollback(error)
609 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
619 # only accept false as false value
620 local = !(params[:local] == false)
621 forward = !(params[:forward] == false)
623 additional = params[:additional] || %{}
627 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
629 Map.merge(additional, %{"to" => [], "cc" => []})
632 with flag_data <- make_flag_data(params, additional),
633 {:ok, activity} <- insert(flag_data, local),
634 {:ok, stripped_activity} <- strip_report_status_data(activity),
635 :ok <- maybe_federate(stripped_activity) do
636 User.all_superusers()
637 |> Enum.filter(fn user -> not is_nil(user.email) end)
638 |> Enum.each(fn superuser ->
640 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
641 |> Pleroma.Emails.Mailer.deliver_async()
648 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
649 def move(%User{} = origin, %User{} = target, local \\ true) do
652 "actor" => origin.ap_id,
653 "object" => origin.ap_id,
654 "target" => target.ap_id
657 with true <- origin.ap_id in target.also_known_as,
658 {:ok, activity} <- insert(params, local) do
659 maybe_federate(activity)
661 BackgroundWorker.enqueue("move_following", %{
662 "origin_id" => origin.id,
663 "target_id" => target.id
668 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
673 def fetch_activities_for_context_query(context, opts) do
674 public = [Constants.as_public()]
678 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
681 from(activity in Activity)
682 |> maybe_preload_objects(opts)
683 |> maybe_preload_bookmarks(opts)
684 |> maybe_set_thread_muted_field(opts)
685 |> restrict_blocked(opts)
686 |> restrict_recipients(recipients, opts["user"])
690 "?->>'type' = ? and ?->>'context' = ?",
697 |> exclude_poll_votes(opts)
699 |> order_by([activity], desc: activity.id)
702 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
703 def fetch_activities_for_context(context, opts \\ %{}) do
705 |> fetch_activities_for_context_query(opts)
709 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
710 FlakeId.Ecto.CompatType.t() | nil
711 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
713 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
719 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
720 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
721 opts = Map.drop(opts, ["user"])
723 [Constants.as_public()]
724 |> fetch_activities_query(opts)
725 |> restrict_unlisted()
726 |> Pagination.fetch_paginated(opts, pagination)
729 @valid_visibilities ~w[direct unlisted public private]
731 defp restrict_visibility(query, %{visibility: visibility})
732 when is_list(visibility) do
733 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
739 "activity_visibility(?, ?, ?) = ANY (?)",
749 Logger.error("Could not restrict visibility to #{visibility}")
753 defp restrict_visibility(query, %{visibility: visibility})
754 when visibility in @valid_visibilities do
758 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
762 defp restrict_visibility(_query, %{visibility: visibility})
763 when visibility not in @valid_visibilities do
764 Logger.error("Could not restrict visibility to #{visibility}")
767 defp restrict_visibility(query, _visibility), do: query
769 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
770 when is_list(visibility) do
771 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
776 "activity_visibility(?, ?, ?) = ANY (?)",
784 Logger.error("Could not exclude visibility to #{visibility}")
789 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
790 when visibility in @valid_visibilities do
795 "activity_visibility(?, ?, ?) = ?",
804 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
805 when visibility not in [nil | @valid_visibilities] do
806 Logger.error("Could not exclude visibility to #{visibility}")
810 defp exclude_visibility(query, _visibility), do: query
812 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
815 defp restrict_thread_visibility(
817 %{"user" => %User{skip_thread_containment: true}},
822 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
825 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
829 defp restrict_thread_visibility(query, _, _), do: query
831 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
834 |> Map.put("user", reading_user)
835 |> Map.put("actor_id", user.ap_id)
838 user_activities_recipients(%{
839 "godmode" => params["godmode"],
840 "reading_user" => reading_user
843 fetch_activities(recipients, params)
847 def fetch_user_activities(user, reading_user, params \\ %{}) do
850 |> Map.put("type", ["Create", "Announce"])
851 |> Map.put("user", reading_user)
852 |> Map.put("actor_id", user.ap_id)
853 |> Map.put("pinned_activity_ids", user.pinned_activities)
856 if User.blocks?(reading_user, user) do
860 |> Map.put("blocking_user", reading_user)
861 |> Map.put("muting_user", reading_user)
865 user_activities_recipients(%{
866 "godmode" => params["godmode"],
867 "reading_user" => reading_user
870 fetch_activities(recipients, params)
874 def fetch_statuses(reading_user, params) do
877 |> Map.put("type", ["Create", "Announce"])
880 user_activities_recipients(%{
881 "godmode" => params["godmode"],
882 "reading_user" => reading_user
885 fetch_activities(recipients, params, :offset)
889 defp user_activities_recipients(%{"godmode" => true}) do
893 defp user_activities_recipients(%{"reading_user" => reading_user}) do
895 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
897 [Constants.as_public()]
901 defp restrict_since(query, %{"since_id" => ""}), do: query
903 defp restrict_since(query, %{"since_id" => since_id}) do
904 from(activity in query, where: activity.id > ^since_id)
907 defp restrict_since(query, _), do: query
909 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
910 raise "Can't use the child object without preloading!"
913 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
914 when is_list(tag_reject) and tag_reject != [] do
916 [_activity, object] in query,
917 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
921 defp restrict_tag_reject(query, _), do: query
923 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
924 raise "Can't use the child object without preloading!"
927 defp restrict_tag_all(query, %{"tag_all" => tag_all})
928 when is_list(tag_all) and tag_all != [] do
930 [_activity, object] in query,
931 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
935 defp restrict_tag_all(query, _), do: query
937 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
938 raise "Can't use the child object without preloading!"
941 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
943 [_activity, object] in query,
944 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
948 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
950 [_activity, object] in query,
951 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
955 defp restrict_tag(query, _), do: query
957 defp restrict_recipients(query, [], _user), do: query
959 defp restrict_recipients(query, recipients, nil) do
960 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
963 defp restrict_recipients(query, recipients, user) do
966 where: fragment("? && ?", ^recipients, activity.recipients),
967 or_where: activity.actor == ^user.ap_id
971 defp restrict_local(query, %{"local_only" => true}) do
972 from(activity in query, where: activity.local == true)
975 defp restrict_local(query, _), do: query
977 defp restrict_actor(query, %{"actor_id" => actor_id}) do
978 from(activity in query, where: activity.actor == ^actor_id)
981 defp restrict_actor(query, _), do: query
983 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
984 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
987 defp restrict_type(query, %{"type" => type}) do
988 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
991 defp restrict_type(query, _), do: query
993 defp restrict_state(query, %{"state" => state}) do
994 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
997 defp restrict_state(query, _), do: query
999 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
1001 [_activity, object] in query,
1002 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
1006 defp restrict_favorited_by(query, _), do: query
1008 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
1009 raise "Can't use the child object without preloading!"
1012 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
1014 [_activity, object] in query,
1015 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
1019 defp restrict_media(query, _), do: query
1021 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
1023 [_activity, object] in query,
1024 where: fragment("?->>'inReplyTo' is null", object.data)
1028 defp restrict_replies(query, %{
1029 "reply_filtering_user" => user,
1030 "reply_visibility" => "self"
1033 [activity, object] in query,
1036 "?->>'inReplyTo' is null OR ? = ANY(?)",
1044 defp restrict_replies(query, %{
1045 "reply_filtering_user" => user,
1046 "reply_visibility" => "following"
1049 [activity, object] in query,
1052 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
1054 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1055 activity.recipients,
1063 defp restrict_replies(query, _), do: query
1065 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
1066 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1069 defp restrict_reblogs(query, _), do: query
1071 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
1073 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
1074 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
1077 from([activity] in query,
1078 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1079 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
1082 unless opts["skip_preload"] do
1083 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1089 defp restrict_muted(query, _), do: query
1091 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
1092 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
1093 domain_blocks = user.domain_blocks || []
1095 following_ap_ids = User.get_friends_ap_ids(user)
1098 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1101 [activity, object: o] in query,
1102 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1103 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
1106 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1113 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1121 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1130 defp restrict_blocked(query, _), do: query
1132 defp restrict_unlisted(query) do
1137 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1139 ^[Constants.as_public()]
1144 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
1145 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
1146 # and `restrict_muted/2`
1148 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
1149 when pinned in [true, "true", "1"] do
1150 from(activity in query, where: activity.id in ^ids)
1153 defp restrict_pinned(query, _), do: query
1155 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1156 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1162 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1170 defp restrict_muted_reblogs(query, _), do: query
1172 defp restrict_instance(query, %{"instance" => instance}) do
1177 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1181 from(activity in query, where: activity.actor in ^users)
1184 defp restrict_instance(query, _), do: query
1186 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1188 defp exclude_poll_votes(query, _) do
1189 if has_named_binding?(query, :object) do
1190 from([activity, object: o] in query,
1191 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1198 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1199 from(activity in query, where: activity.id != ^id)
1202 defp exclude_id(query, _), do: query
1204 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1206 defp maybe_preload_objects(query, _) do
1208 |> Activity.with_preloaded_object()
1211 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1213 defp maybe_preload_bookmarks(query, opts) do
1215 |> Activity.with_preloaded_bookmark(opts["user"])
1218 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1220 |> Activity.with_preloaded_report_notes()
1223 defp maybe_preload_report_notes(query, _), do: query
1225 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1227 defp maybe_set_thread_muted_field(query, opts) do
1229 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1232 defp maybe_order(query, %{order: :desc}) do
1234 |> order_by(desc: :id)
1237 defp maybe_order(query, %{order: :asc}) do
1239 |> order_by(asc: :id)
1242 defp maybe_order(query, _), do: query
1244 defp fetch_activities_query_ap_ids_ops(opts) do
1245 source_user = opts["muting_user"]
1246 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1248 ap_id_relationships =
1249 ap_id_relationships ++
1250 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1256 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1258 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1259 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1261 restrict_muted_reblogs_opts =
1262 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1264 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1267 def fetch_activities_query(recipients, opts \\ %{}) do
1268 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1269 fetch_activities_query_ap_ids_ops(opts)
1272 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1276 |> maybe_preload_objects(opts)
1277 |> maybe_preload_bookmarks(opts)
1278 |> maybe_preload_report_notes(opts)
1279 |> maybe_set_thread_muted_field(opts)
1280 |> maybe_order(opts)
1281 |> restrict_recipients(recipients, opts["user"])
1282 |> restrict_replies(opts)
1283 |> restrict_tag(opts)
1284 |> restrict_tag_reject(opts)
1285 |> restrict_tag_all(opts)
1286 |> restrict_since(opts)
1287 |> restrict_local(opts)
1288 |> restrict_actor(opts)
1289 |> restrict_type(opts)
1290 |> restrict_state(opts)
1291 |> restrict_favorited_by(opts)
1292 |> restrict_blocked(restrict_blocked_opts)
1293 |> restrict_muted(restrict_muted_opts)
1294 |> restrict_media(opts)
1295 |> restrict_visibility(opts)
1296 |> restrict_thread_visibility(opts, config)
1297 |> restrict_reblogs(opts)
1298 |> restrict_pinned(opts)
1299 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1300 |> restrict_instance(opts)
1301 |> Activity.restrict_deactivated_users()
1302 |> exclude_poll_votes(opts)
1303 |> exclude_visibility(opts)
1306 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1307 list_memberships = Pleroma.List.memberships(opts["user"])
1309 fetch_activities_query(recipients ++ list_memberships, opts)
1310 |> Pagination.fetch_paginated(opts, pagination)
1312 |> maybe_update_cc(list_memberships, opts["user"])
1316 Fetch favorites activities of user with order by sort adds to favorites
1318 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1319 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1321 |> Activity.Queries.by_actor()
1322 |> Activity.Queries.by_type("Like")
1323 |> Activity.with_joined_object()
1324 |> Object.with_joined_activity()
1325 |> select([_like, object, activity], %{activity | object: object})
1326 |> order_by([like, _, _], desc: like.id)
1327 |> Pagination.fetch_paginated(
1328 Map.merge(params, %{"skip_order" => true}),
1334 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1335 when is_list(list_memberships) and length(list_memberships) > 0 do
1336 Enum.map(activities, fn
1337 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1338 if Enum.any?(bcc, &(&1 in list_memberships)) do
1339 update_in(activity.data["cc"], &[user_ap_id | &1])
1349 defp maybe_update_cc(activities, _, _), do: activities
1351 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1352 from(activity in query,
1354 fragment("? && ?", activity.recipients, ^recipients) or
1355 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1356 ^Constants.as_public() in activity.recipients)
1360 def fetch_activities_bounded(
1362 recipients_with_public,
1364 pagination \\ :keyset
1366 fetch_activities_query([], opts)
1367 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1368 |> Pagination.fetch_paginated(opts, pagination)
1372 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1373 def upload(file, opts \\ []) do
1374 with {:ok, data} <- Upload.store(file, opts) do
1377 Map.put(data, "actor", opts[:actor])
1382 Repo.insert(%Object{data: obj_data})
1386 @spec get_actor_url(any()) :: binary() | nil
1387 defp get_actor_url(url) when is_binary(url), do: url
1388 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1390 defp get_actor_url(url) when is_list(url) do
1396 defp get_actor_url(_url), do: nil
1398 defp object_to_user_data(data) do
1400 data["icon"]["url"] &&
1403 "url" => [%{"href" => data["icon"]["url"]}]
1407 data["image"]["url"] &&
1410 "url" => [%{"href" => data["image"]["url"]}]
1415 |> Map.get("attachment", [])
1416 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1417 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1421 |> Map.get("tag", [])
1423 %{"type" => "Emoji"} -> true
1426 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1427 Map.put(acc, String.trim(name, ":"), url)
1430 locked = data["manuallyApprovesFollowers"] || false
1431 data = Transmogrifier.maybe_fix_user_object(data)
1432 discoverable = data["discoverable"] || false
1433 invisible = data["invisible"] || false
1434 actor_type = data["type"] || "Person"
1437 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1438 data["publicKey"]["publicKeyPem"]
1444 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1445 data["endpoints"]["sharedInbox"]
1452 uri: get_actor_url(data["url"]),
1458 discoverable: discoverable,
1459 invisible: invisible,
1462 follower_address: data["followers"],
1463 following_address: data["following"],
1464 bio: data["summary"],
1465 actor_type: actor_type,
1466 also_known_as: Map.get(data, "alsoKnownAs", []),
1467 public_key: public_key,
1468 inbox: data["inbox"],
1469 shared_inbox: shared_inbox
1472 # nickname can be nil because of virtual actors
1474 if data["preferredUsername"] do
1478 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1481 Map.put(user_data, :nickname, nil)
1487 def fetch_follow_information_for_user(user) do
1488 with {:ok, following_data} <-
1489 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1490 {:ok, hide_follows} <- collection_private(following_data),
1491 {:ok, followers_data} <-
1492 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1493 {:ok, hide_followers} <- collection_private(followers_data) do
1496 hide_follows: hide_follows,
1497 follower_count: normalize_counter(followers_data["totalItems"]),
1498 following_count: normalize_counter(following_data["totalItems"]),
1499 hide_followers: hide_followers
1502 {:error, _} = e -> e
1507 defp normalize_counter(counter) when is_integer(counter), do: counter
1508 defp normalize_counter(_), do: 0
1510 def maybe_update_follow_information(user_data) do
1511 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1512 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1514 {:collections_available,
1515 !!(user_data[:following_address] && user_data[:follower_address])},
1517 fetch_follow_information_for_user(user_data) do
1518 info = Map.merge(user_data[:info] || %{}, info)
1521 |> Map.put(:info, info)
1523 {:user_type_check, false} ->
1526 {:collections_available, false} ->
1529 {:enabled, false} ->
1534 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1541 defp collection_private(%{"first" => %{"type" => type}})
1542 when type in ["CollectionPage", "OrderedCollectionPage"],
1545 defp collection_private(%{"first" => first}) do
1546 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1547 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1550 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1551 {:error, _} = e -> e
1556 defp collection_private(_data), do: {:ok, true}
1558 def user_data_from_user_object(data) do
1559 with {:ok, data} <- MRF.filter(data),
1560 {:ok, data} <- object_to_user_data(data) do
1567 def fetch_and_prepare_user_from_ap_id(ap_id) do
1568 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1569 {:ok, data} <- user_data_from_user_object(data),
1570 data <- maybe_update_follow_information(data) do
1573 {:error, "Object has been deleted"} = e ->
1574 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1578 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1583 def make_user_from_ap_id(ap_id) do
1584 user = User.get_cached_by_ap_id(ap_id)
1586 if user && !User.ap_enabled?(user) do
1587 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1589 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1592 |> User.remote_user_changeset(data)
1593 |> User.update_and_set_cache()
1596 |> User.remote_user_changeset()
1606 def make_user_from_nickname(nickname) do
1607 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1608 make_user_from_ap_id(ap_id)
1610 _e -> {:error, "No AP id in WebFinger"}
1614 # filter out broken threads
1615 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1616 entire_thread_visible_for_user?(activity, user)
1619 # do post-processing on a specific activity
1620 def contain_activity(%Activity{} = activity, %User{} = user) do
1621 contain_broken_threads(activity, user)
1624 def fetch_direct_messages_query do
1626 |> restrict_type(%{"type" => "Create"})
1627 |> restrict_visibility(%{visibility: "direct"})
1628 |> order_by([activity], asc: activity.id)