1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
14 alias Pleroma.Object.Containment
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.Pagination
20 alias Pleroma.Web.ActivityPub.MRF
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.Utils
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
32 require Pleroma.Constants
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
46 user -> User.following?(user, actor)
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
88 defp check_remote_limit(_), do: true
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
107 def increase_replies_count_if_reply(_create_data), do: :noop
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
117 def decrease_replies_count_if_reply(_object), do: :noop
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
124 Object.increase_vote_count(reply_ap_id, name, actor)
127 def increase_poll_votes_if_vote(_create_data), do: :noop
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
134 Repo.insert(%Activity{
137 recipients: recipients,
138 actor: object["actor"]
140 {:ok, activity, meta}
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
156 Repo.insert(%Activity{
160 recipients: recipients
163 # Splice in the child object if we have one.
165 if not is_nil(object) do
166 Map.put(activity, :object, object)
171 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
175 %Activity{} = activity ->
178 {:fake, true, map, recipients} ->
179 activity = %Activity{
183 recipients: recipients,
187 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
195 def notify_and_stream(activity) do
196 Notification.create_notifications(activity)
198 conversation = create_or_bump_conversation(activity, activity.actor)
199 participations = get_participations(conversation)
201 stream_out_participations(participations)
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor),
207 Participation.mark_as_read(user, conversation) do
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
228 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
229 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
230 conversation = Repo.preload(conversation, :participations),
232 fetch_latest_activity_id_for_context(conversation.ap_id, %{
234 "blocking_user" => user
236 if last_activity_id do
237 stream_out_participations(conversation.participations)
242 def stream_out_participations(_, _), do: :noop
244 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
245 when data_type in ["Create", "Announce", "Delete"] do
247 |> Topics.get_activity_topics()
248 |> Streamer.stream(activity)
251 def stream_out(_activity) do
255 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
256 def create(params, fake \\ false) do
257 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
262 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
263 additional = params[:additional] || %{}
264 # only accept false as false value
265 local = !(params[:local] == false)
266 published = params[:published]
267 quick_insert? = Config.get([:env]) == :benchmark
271 %{to: to, actor: actor, published: published, context: context, object: object},
274 {:ok, activity} <- insert(create_data, local, fake),
275 {:fake, false, activity} <- {:fake, fake, activity},
276 _ <- increase_replies_count_if_reply(create_data),
277 _ <- increase_poll_votes_if_vote(create_data),
278 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
279 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
280 _ <- notify_and_stream(activity),
281 :ok <- maybe_federate(activity) do
284 {:quick_insert, true, activity} ->
287 {:fake, true, activity} ->
291 Repo.rollback(message)
295 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
296 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
297 additional = params[:additional] || %{}
298 # only accept false as false value
299 local = !(params[:local] == false)
300 published = params[:published]
304 %{to: to, actor: actor, published: published, context: context, object: object},
307 {:ok, activity} <- insert(listen_data, local),
308 _ <- notify_and_stream(activity),
309 :ok <- maybe_federate(activity) do
314 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def accept(params) do
316 accept_or_reject("Accept", params)
319 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
320 def reject(params) do
321 accept_or_reject("Reject", params)
324 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
325 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
326 local = Map.get(params, :local, true)
327 activity_id = Map.get(params, :activity_id, nil)
330 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
331 |> Utils.maybe_put("id", activity_id),
332 {:ok, activity} <- insert(data, local),
333 _ <- notify_and_stream(activity),
334 :ok <- maybe_federate(activity) do
339 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
341 local = !(params[:local] == false)
342 activity_id = params[:activity_id]
351 data <- Utils.maybe_put(data, "id", activity_id),
352 {:ok, activity} <- insert(data, local),
353 _ <- notify_and_stream(activity),
354 :ok <- maybe_federate(activity) do
359 @spec unreact_with_emoji(User.t(), String.t(), keyword()) ::
360 {:ok, Activity.t(), Object.t()} | {:error, any()}
361 def unreact_with_emoji(user, reaction_id, options \\ []) do
362 with {:ok, result} <-
363 Repo.transaction(fn -> do_unreact_with_emoji(user, reaction_id, options) end) do
368 defp do_unreact_with_emoji(user, reaction_id, options) do
369 with local <- Keyword.get(options, :local, true),
370 activity_id <- Keyword.get(options, :activity_id, nil),
371 user_ap_id <- user.ap_id,
372 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
373 object <- Object.normalize(reaction_activity),
374 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
375 {:ok, activity} <- insert(unreact_data, local),
376 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
377 _ <- notify_and_stream(activity),
378 :ok <- maybe_federate(activity) do
379 {:ok, activity, object}
381 {:error, error} -> Repo.rollback(error)
385 @spec unlike(User.t(), Object.t(), String.t() | nil, boolean()) ::
386 {:ok, Activity.t(), Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
387 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
388 with {:ok, result} <-
389 Repo.transaction(fn -> do_unlike(actor, object, activity_id, local) end) do
394 defp do_unlike(actor, object, activity_id, local) do
395 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
396 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
397 {:ok, unlike_activity} <- insert(unlike_data, local),
398 {:ok, _activity} <- Repo.delete(like_activity),
399 {:ok, object} <- remove_like_from_object(like_activity, object),
400 _ <- notify_and_stream(unlike_activity),
401 :ok <- maybe_federate(unlike_activity) do
402 {:ok, unlike_activity, like_activity, object}
405 {:error, error} -> Repo.rollback(error)
409 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
410 {:ok, Activity.t(), Object.t()} | {:error, any()}
412 %User{ap_id: _} = user,
413 %Object{data: %{"id" => _}} = object,
418 with {:ok, result} <-
419 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
424 defp do_announce(user, object, activity_id, local, public) do
425 with true <- is_announceable?(object, user, public),
426 object <- Object.get_by_id(object.id),
427 announce_data <- make_announce_data(user, object, activity_id, public),
428 {:ok, activity} <- insert(announce_data, local),
429 {:ok, object} <- add_announce_to_object(activity, object),
430 _ <- notify_and_stream(activity),
431 :ok <- maybe_federate(activity) do
432 {:ok, activity, object}
434 false -> {:error, false}
435 {:error, error} -> Repo.rollback(error)
439 @spec unannounce(User.t(), Object.t(), String.t() | nil, boolean()) ::
440 {:ok, Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
447 with {:ok, result} <-
448 Repo.transaction(fn -> do_unannounce(actor, object, activity_id, local) end) do
453 defp do_unannounce(actor, object, activity_id, local) do
454 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
455 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
456 {:ok, unannounce_activity} <- insert(unannounce_data, local),
457 _ <- notify_and_stream(unannounce_activity),
458 :ok <- maybe_federate(unannounce_activity),
459 {:ok, _activity} <- Repo.delete(announce_activity),
460 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
461 {:ok, unannounce_activity, object}
464 {:error, error} -> Repo.rollback(error)
468 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
469 {:ok, Activity.t()} | {:error, any()}
470 def follow(follower, followed, activity_id \\ nil, local \\ true) do
471 with {:ok, result} <-
472 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
477 defp do_follow(follower, followed, activity_id, local) do
478 with data <- make_follow_data(follower, followed, activity_id),
479 {:ok, activity} <- insert(data, local),
480 _ <- notify_and_stream(activity),
481 :ok <- maybe_federate(activity) do
484 {:error, error} -> Repo.rollback(error)
488 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
489 {:ok, Activity.t()} | nil | {:error, any()}
490 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
491 with {:ok, result} <-
492 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
497 defp do_unfollow(follower, followed, activity_id, local) do
498 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
499 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
500 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
501 {:ok, activity} <- insert(unfollow_data, local),
502 _ <- notify_and_stream(activity),
503 :ok <- maybe_federate(activity) do
507 {:error, error} -> Repo.rollback(error)
511 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
512 {:ok, Activity.t()} | {:error, any()}
513 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
514 with {:ok, result} <-
515 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
520 defp do_block(blocker, blocked, activity_id, local) do
521 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
522 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
524 if unfollow_blocked do
525 follow_activity = fetch_latest_follow(blocker, blocked)
526 if follow_activity, do: unfollow(blocker, blocked, nil, local)
529 with true <- outgoing_blocks,
530 block_data <- make_block_data(blocker, blocked, activity_id),
531 {:ok, activity} <- insert(block_data, local),
532 _ <- notify_and_stream(activity),
533 :ok <- maybe_federate(activity) do
536 {:error, error} -> Repo.rollback(error)
540 @spec unblock(User.t(), User.t(), String.t() | nil, boolean()) ::
541 {:ok, Activity.t()} | {:error, any()} | nil
542 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
543 with {:ok, result} <-
544 Repo.transaction(fn -> do_unblock(blocker, blocked, activity_id, local) end) do
549 defp do_unblock(blocker, blocked, activity_id, local) do
550 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
551 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
552 {:ok, activity} <- insert(unblock_data, local),
553 _ <- notify_and_stream(activity),
554 :ok <- maybe_federate(activity) do
558 {:error, error} -> Repo.rollback(error)
562 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
572 # only accept false as false value
573 local = !(params[:local] == false)
574 forward = !(params[:forward] == false)
576 additional = params[:additional] || %{}
580 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
582 Map.merge(additional, %{"to" => [], "cc" => []})
585 with flag_data <- make_flag_data(params, additional),
586 {:ok, activity} <- insert(flag_data, local),
587 {:ok, stripped_activity} <- strip_report_status_data(activity),
588 _ <- notify_and_stream(activity),
589 :ok <- maybe_federate(stripped_activity) do
590 User.all_superusers()
591 |> Enum.filter(fn user -> not is_nil(user.email) end)
592 |> Enum.each(fn superuser ->
594 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
595 |> Pleroma.Emails.Mailer.deliver_async()
602 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
603 def move(%User{} = origin, %User{} = target, local \\ true) do
606 "actor" => origin.ap_id,
607 "object" => origin.ap_id,
608 "target" => target.ap_id
611 with true <- origin.ap_id in target.also_known_as,
612 {:ok, activity} <- insert(params, local),
613 _ <- notify_and_stream(activity) do
614 maybe_federate(activity)
616 BackgroundWorker.enqueue("move_following", %{
617 "origin_id" => origin.id,
618 "target_id" => target.id
623 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
628 def fetch_activities_for_context_query(context, opts) do
629 public = [Constants.as_public()]
633 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
636 from(activity in Activity)
637 |> maybe_preload_objects(opts)
638 |> maybe_preload_bookmarks(opts)
639 |> maybe_set_thread_muted_field(opts)
640 |> restrict_blocked(opts)
641 |> restrict_recipients(recipients, opts["user"])
645 "?->>'type' = ? and ?->>'context' = ?",
652 |> exclude_poll_votes(opts)
654 |> order_by([activity], desc: activity.id)
657 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
658 def fetch_activities_for_context(context, opts \\ %{}) do
660 |> fetch_activities_for_context_query(opts)
664 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
665 FlakeId.Ecto.CompatType.t() | nil
666 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
668 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
674 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
675 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
676 opts = Map.drop(opts, ["user"])
678 [Constants.as_public()]
679 |> fetch_activities_query(opts)
680 |> restrict_unlisted()
681 |> Pagination.fetch_paginated(opts, pagination)
684 @valid_visibilities ~w[direct unlisted public private]
686 defp restrict_visibility(query, %{visibility: visibility})
687 when is_list(visibility) do
688 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
694 "activity_visibility(?, ?, ?) = ANY (?)",
704 Logger.error("Could not restrict visibility to #{visibility}")
708 defp restrict_visibility(query, %{visibility: visibility})
709 when visibility in @valid_visibilities do
713 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
717 defp restrict_visibility(_query, %{visibility: visibility})
718 when visibility not in @valid_visibilities do
719 Logger.error("Could not restrict visibility to #{visibility}")
722 defp restrict_visibility(query, _visibility), do: query
724 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
725 when is_list(visibility) do
726 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
731 "activity_visibility(?, ?, ?) = ANY (?)",
739 Logger.error("Could not exclude visibility to #{visibility}")
744 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
745 when visibility in @valid_visibilities do
750 "activity_visibility(?, ?, ?) = ?",
759 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
760 when visibility not in [nil | @valid_visibilities] do
761 Logger.error("Could not exclude visibility to #{visibility}")
765 defp exclude_visibility(query, _visibility), do: query
767 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
770 defp restrict_thread_visibility(
772 %{"user" => %User{skip_thread_containment: true}},
777 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
780 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
784 defp restrict_thread_visibility(query, _, _), do: query
786 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
789 |> Map.put("user", reading_user)
790 |> Map.put("actor_id", user.ap_id)
793 user_activities_recipients(%{
794 "godmode" => params["godmode"],
795 "reading_user" => reading_user
798 fetch_activities(recipients, params)
802 def fetch_user_activities(user, reading_user, params \\ %{}) do
805 |> Map.put("type", ["Create", "Announce"])
806 |> Map.put("user", reading_user)
807 |> Map.put("actor_id", user.ap_id)
808 |> Map.put("pinned_activity_ids", user.pinned_activities)
811 if User.blocks?(reading_user, user) do
815 |> Map.put("blocking_user", reading_user)
816 |> Map.put("muting_user", reading_user)
820 user_activities_recipients(%{
821 "godmode" => params["godmode"],
822 "reading_user" => reading_user
825 fetch_activities(recipients, params)
829 def fetch_statuses(reading_user, params) do
832 |> Map.put("type", ["Create", "Announce"])
835 user_activities_recipients(%{
836 "godmode" => params["godmode"],
837 "reading_user" => reading_user
840 fetch_activities(recipients, params, :offset)
844 defp user_activities_recipients(%{"godmode" => true}) do
848 defp user_activities_recipients(%{"reading_user" => reading_user}) do
850 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
852 [Constants.as_public()]
856 defp restrict_since(query, %{"since_id" => ""}), do: query
858 defp restrict_since(query, %{"since_id" => since_id}) do
859 from(activity in query, where: activity.id > ^since_id)
862 defp restrict_since(query, _), do: query
864 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
865 raise "Can't use the child object without preloading!"
868 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
869 when is_list(tag_reject) and tag_reject != [] do
871 [_activity, object] in query,
872 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
876 defp restrict_tag_reject(query, _), do: query
878 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
879 raise "Can't use the child object without preloading!"
882 defp restrict_tag_all(query, %{"tag_all" => tag_all})
883 when is_list(tag_all) and tag_all != [] do
885 [_activity, object] in query,
886 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
890 defp restrict_tag_all(query, _), do: query
892 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
893 raise "Can't use the child object without preloading!"
896 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
898 [_activity, object] in query,
899 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
903 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
905 [_activity, object] in query,
906 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
910 defp restrict_tag(query, _), do: query
912 defp restrict_recipients(query, [], _user), do: query
914 defp restrict_recipients(query, recipients, nil) do
915 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
918 defp restrict_recipients(query, recipients, user) do
921 where: fragment("? && ?", ^recipients, activity.recipients),
922 or_where: activity.actor == ^user.ap_id
926 defp restrict_local(query, %{"local_only" => true}) do
927 from(activity in query, where: activity.local == true)
930 defp restrict_local(query, _), do: query
932 defp restrict_actor(query, %{"actor_id" => actor_id}) do
933 from(activity in query, where: activity.actor == ^actor_id)
936 defp restrict_actor(query, _), do: query
938 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
939 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
942 defp restrict_type(query, %{"type" => type}) do
943 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
946 defp restrict_type(query, _), do: query
948 defp restrict_state(query, %{"state" => state}) do
949 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
952 defp restrict_state(query, _), do: query
954 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
956 [_activity, object] in query,
957 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
961 defp restrict_favorited_by(query, _), do: query
963 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
964 raise "Can't use the child object without preloading!"
967 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
969 [_activity, object] in query,
970 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
974 defp restrict_media(query, _), do: query
976 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
978 [_activity, object] in query,
979 where: fragment("?->>'inReplyTo' is null", object.data)
983 defp restrict_replies(query, %{
984 "reply_filtering_user" => user,
985 "reply_visibility" => "self"
988 [activity, object] in query,
991 "?->>'inReplyTo' is null OR ? = ANY(?)",
999 defp restrict_replies(query, %{
1000 "reply_filtering_user" => user,
1001 "reply_visibility" => "following"
1004 [activity, object] in query,
1007 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
1009 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1010 activity.recipients,
1018 defp restrict_replies(query, _), do: query
1020 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
1021 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1024 defp restrict_reblogs(query, _), do: query
1026 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
1028 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
1029 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
1032 from([activity] in query,
1033 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1034 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
1037 unless opts["skip_preload"] do
1038 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1044 defp restrict_muted(query, _), do: query
1046 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
1047 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
1048 domain_blocks = user.domain_blocks || []
1050 following_ap_ids = User.get_friends_ap_ids(user)
1053 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1056 [activity, object: o] in query,
1057 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1058 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
1061 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1068 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1076 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1085 defp restrict_blocked(query, _), do: query
1087 defp restrict_unlisted(query) do
1092 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1094 ^[Constants.as_public()]
1099 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
1100 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
1101 # and `restrict_muted/2`
1103 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
1104 when pinned in [true, "true", "1"] do
1105 from(activity in query, where: activity.id in ^ids)
1108 defp restrict_pinned(query, _), do: query
1110 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1111 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1117 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1125 defp restrict_muted_reblogs(query, _), do: query
1127 defp restrict_instance(query, %{"instance" => instance}) do
1132 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1136 from(activity in query, where: activity.actor in ^users)
1139 defp restrict_instance(query, _), do: query
1141 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1143 defp exclude_poll_votes(query, _) do
1144 if has_named_binding?(query, :object) do
1145 from([activity, object: o] in query,
1146 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1153 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1154 from(activity in query, where: activity.id != ^id)
1157 defp exclude_id(query, _), do: query
1159 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1161 defp maybe_preload_objects(query, _) do
1163 |> Activity.with_preloaded_object()
1166 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1168 defp maybe_preload_bookmarks(query, opts) do
1170 |> Activity.with_preloaded_bookmark(opts["user"])
1173 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1175 |> Activity.with_preloaded_report_notes()
1178 defp maybe_preload_report_notes(query, _), do: query
1180 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1182 defp maybe_set_thread_muted_field(query, opts) do
1184 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1187 defp maybe_order(query, %{order: :desc}) do
1189 |> order_by(desc: :id)
1192 defp maybe_order(query, %{order: :asc}) do
1194 |> order_by(asc: :id)
1197 defp maybe_order(query, _), do: query
1199 defp fetch_activities_query_ap_ids_ops(opts) do
1200 source_user = opts["muting_user"]
1201 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1203 ap_id_relationships =
1204 ap_id_relationships ++
1205 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1211 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1213 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1214 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1216 restrict_muted_reblogs_opts =
1217 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1219 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1222 def fetch_activities_query(recipients, opts \\ %{}) do
1223 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1224 fetch_activities_query_ap_ids_ops(opts)
1227 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1231 |> maybe_preload_objects(opts)
1232 |> maybe_preload_bookmarks(opts)
1233 |> maybe_preload_report_notes(opts)
1234 |> maybe_set_thread_muted_field(opts)
1235 |> maybe_order(opts)
1236 |> restrict_recipients(recipients, opts["user"])
1237 |> restrict_replies(opts)
1238 |> restrict_tag(opts)
1239 |> restrict_tag_reject(opts)
1240 |> restrict_tag_all(opts)
1241 |> restrict_since(opts)
1242 |> restrict_local(opts)
1243 |> restrict_actor(opts)
1244 |> restrict_type(opts)
1245 |> restrict_state(opts)
1246 |> restrict_favorited_by(opts)
1247 |> restrict_blocked(restrict_blocked_opts)
1248 |> restrict_muted(restrict_muted_opts)
1249 |> restrict_media(opts)
1250 |> restrict_visibility(opts)
1251 |> restrict_thread_visibility(opts, config)
1252 |> restrict_reblogs(opts)
1253 |> restrict_pinned(opts)
1254 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1255 |> restrict_instance(opts)
1256 |> Activity.restrict_deactivated_users()
1257 |> exclude_poll_votes(opts)
1258 |> exclude_visibility(opts)
1261 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1262 list_memberships = Pleroma.List.memberships(opts["user"])
1264 fetch_activities_query(recipients ++ list_memberships, opts)
1265 |> Pagination.fetch_paginated(opts, pagination)
1267 |> maybe_update_cc(list_memberships, opts["user"])
1271 Fetch favorites activities of user with order by sort adds to favorites
1273 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1274 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1276 |> Activity.Queries.by_actor()
1277 |> Activity.Queries.by_type("Like")
1278 |> Activity.with_joined_object()
1279 |> Object.with_joined_activity()
1280 |> select([_like, object, activity], %{activity | object: object})
1281 |> order_by([like, _, _], desc: like.id)
1282 |> Pagination.fetch_paginated(
1283 Map.merge(params, %{"skip_order" => true}),
1289 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1290 when is_list(list_memberships) and length(list_memberships) > 0 do
1291 Enum.map(activities, fn
1292 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1293 if Enum.any?(bcc, &(&1 in list_memberships)) do
1294 update_in(activity.data["cc"], &[user_ap_id | &1])
1304 defp maybe_update_cc(activities, _, _), do: activities
1306 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1307 from(activity in query,
1309 fragment("? && ?", activity.recipients, ^recipients) or
1310 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1311 ^Constants.as_public() in activity.recipients)
1315 def fetch_activities_bounded(
1317 recipients_with_public,
1319 pagination \\ :keyset
1321 fetch_activities_query([], opts)
1322 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1323 |> Pagination.fetch_paginated(opts, pagination)
1327 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1328 def upload(file, opts \\ []) do
1329 with {:ok, data} <- Upload.store(file, opts) do
1332 Map.put(data, "actor", opts[:actor])
1337 Repo.insert(%Object{data: obj_data})
1341 @spec get_actor_url(any()) :: binary() | nil
1342 defp get_actor_url(url) when is_binary(url), do: url
1343 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1345 defp get_actor_url(url) when is_list(url) do
1351 defp get_actor_url(_url), do: nil
1353 defp object_to_user_data(data) do
1355 data["icon"]["url"] &&
1358 "url" => [%{"href" => data["icon"]["url"]}]
1362 data["image"]["url"] &&
1365 "url" => [%{"href" => data["image"]["url"]}]
1370 |> Map.get("attachment", [])
1371 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1372 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1376 |> Map.get("tag", [])
1378 %{"type" => "Emoji"} -> true
1381 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1382 Map.put(acc, String.trim(name, ":"), url)
1385 locked = data["manuallyApprovesFollowers"] || false
1386 data = Transmogrifier.maybe_fix_user_object(data)
1387 discoverable = data["discoverable"] || false
1388 invisible = data["invisible"] || false
1389 actor_type = data["type"] || "Person"
1392 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1393 data["publicKey"]["publicKeyPem"]
1399 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1400 data["endpoints"]["sharedInbox"]
1407 uri: get_actor_url(data["url"]),
1413 discoverable: discoverable,
1414 invisible: invisible,
1417 follower_address: data["followers"],
1418 following_address: data["following"],
1419 bio: data["summary"],
1420 actor_type: actor_type,
1421 also_known_as: Map.get(data, "alsoKnownAs", []),
1422 public_key: public_key,
1423 inbox: data["inbox"],
1424 shared_inbox: shared_inbox
1427 # nickname can be nil because of virtual actors
1429 if data["preferredUsername"] do
1433 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1436 Map.put(user_data, :nickname, nil)
1442 def fetch_follow_information_for_user(user) do
1443 with {:ok, following_data} <-
1444 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1445 {:ok, hide_follows} <- collection_private(following_data),
1446 {:ok, followers_data} <-
1447 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1448 {:ok, hide_followers} <- collection_private(followers_data) do
1451 hide_follows: hide_follows,
1452 follower_count: normalize_counter(followers_data["totalItems"]),
1453 following_count: normalize_counter(following_data["totalItems"]),
1454 hide_followers: hide_followers
1457 {:error, _} = e -> e
1462 defp normalize_counter(counter) when is_integer(counter), do: counter
1463 defp normalize_counter(_), do: 0
1465 def maybe_update_follow_information(user_data) do
1466 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1467 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1469 {:collections_available,
1470 !!(user_data[:following_address] && user_data[:follower_address])},
1472 fetch_follow_information_for_user(user_data) do
1473 info = Map.merge(user_data[:info] || %{}, info)
1476 |> Map.put(:info, info)
1478 {:user_type_check, false} ->
1481 {:collections_available, false} ->
1484 {:enabled, false} ->
1489 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1496 defp collection_private(%{"first" => %{"type" => type}})
1497 when type in ["CollectionPage", "OrderedCollectionPage"],
1500 defp collection_private(%{"first" => first}) do
1501 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1502 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1505 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1506 {:error, _} = e -> e
1511 defp collection_private(_data), do: {:ok, true}
1513 def user_data_from_user_object(data) do
1514 with {:ok, data} <- MRF.filter(data),
1515 {:ok, data} <- object_to_user_data(data) do
1522 def fetch_and_prepare_user_from_ap_id(ap_id) do
1523 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1524 {:ok, data} <- user_data_from_user_object(data),
1525 data <- maybe_update_follow_information(data) do
1528 {:error, "Object has been deleted"} = e ->
1529 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1533 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1538 def make_user_from_ap_id(ap_id) do
1539 user = User.get_cached_by_ap_id(ap_id)
1541 if user && !User.ap_enabled?(user) do
1542 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1544 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1547 |> User.remote_user_changeset(data)
1548 |> User.update_and_set_cache()
1551 |> User.remote_user_changeset()
1561 def make_user_from_nickname(nickname) do
1562 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1563 make_user_from_ap_id(ap_id)
1565 _e -> {:error, "No AP id in WebFinger"}
1569 # filter out broken threads
1570 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1571 entire_thread_visible_for_user?(activity, user)
1574 # do post-processing on a specific activity
1575 def contain_activity(%Activity{} = activity, %User{} = user) do
1576 contain_broken_threads(activity, user)
1579 def fetch_direct_messages_query do
1581 |> restrict_type(%{"type" => "Create"})
1582 |> restrict_visibility(%{visibility: "direct"})
1583 |> order_by([activity], asc: activity.id)