1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 # For Announce activities, we filter the recipients based on following status for any actors
36 # that match actual users. See issue #164 for more information about why this is necessary.
37 defp get_recipients(%{"type" => "Announce"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = User.get_cached_by_ap_id(data["actor"])
44 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
45 case User.get_cached_by_ap_id(recipient) do
47 user -> User.following?(user, actor)
54 defp get_recipients(%{"type" => "Create"} = data) do
55 to = Map.get(data, "to", [])
56 cc = Map.get(data, "cc", [])
57 bcc = Map.get(data, "bcc", [])
58 actor = Map.get(data, "actor", [])
59 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
63 defp get_recipients(data) do
64 to = Map.get(data, "to", [])
65 cc = Map.get(data, "cc", [])
66 bcc = Map.get(data, "bcc", [])
67 recipients = Enum.concat([to, cc, bcc])
71 defp check_actor_is_active(actor) do
72 if not is_nil(actor) do
73 with user <- User.get_cached_by_ap_id(actor),
74 false <- user.deactivated do
84 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
85 limit = Config.get([:instance, :remote_limit])
86 String.length(content) <= limit
89 defp check_remote_limit(_), do: true
91 def increase_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
95 def decrease_note_count_if_public(actor, object) do
96 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
99 def increase_replies_count_if_reply(%{
100 "object" => %{"inReplyTo" => reply_ap_id} = object,
103 if is_public?(object) do
104 Object.increase_replies_count(reply_ap_id)
108 def increase_replies_count_if_reply(_create_data), do: :noop
110 def decrease_replies_count_if_reply(%Object{
111 data: %{"inReplyTo" => reply_ap_id} = object
113 if is_public?(object) do
114 Object.decrease_replies_count(reply_ap_id)
118 def decrease_replies_count_if_reply(_object), do: :noop
120 def increase_poll_votes_if_vote(%{
121 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
125 Object.increase_vote_count(reply_ap_id, name, actor)
128 def increase_poll_votes_if_vote(_create_data), do: :noop
130 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
131 def persist(object, meta) do
132 with local <- Keyword.fetch!(meta, :local),
133 {recipients, _, _} <- get_recipients(object),
135 Repo.insert(%Activity{
138 recipients: recipients,
139 actor: object["actor"]
141 {:ok, activity, meta}
145 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
146 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
147 with nil <- Activity.normalize(map),
148 map <- lazy_put_activity_defaults(map, fake),
149 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
150 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
151 {:ok, map} <- MRF.filter(map),
152 {recipients, _, _} = get_recipients(map),
153 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
154 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
155 {:ok, map, object} <- insert_full_object(map) do
161 recipients: recipients
164 |> maybe_create_activity_expiration()
166 # Splice in the child object if we have one.
168 if not is_nil(object) do
169 Map.put(activity, :object, object)
174 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
176 Notification.create_notifications(activity)
178 conversation = create_or_bump_conversation(activity, map["actor"])
179 participations = get_participations(conversation)
181 stream_out_participations(participations)
184 %Activity{} = activity ->
187 {:fake, true, map, recipients} ->
188 activity = %Activity{
192 recipients: recipients,
196 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
204 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
205 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
210 defp maybe_create_activity_expiration(result), do: result
212 defp create_or_bump_conversation(activity, actor) do
213 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
214 %User{} = user <- User.get_cached_by_ap_id(actor),
215 Participation.mark_as_read(user, conversation) do
220 defp get_participations({:ok, conversation}) do
222 |> Repo.preload(:participations, force: true)
223 |> Map.get(:participations)
226 defp get_participations(_), do: []
228 def stream_out_participations(participations) do
231 |> Repo.preload(:user)
233 Streamer.stream("participation", participations)
236 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
237 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
238 conversation = Repo.preload(conversation, :participations),
240 fetch_latest_activity_id_for_context(conversation.ap_id, %{
242 "blocking_user" => user
244 if last_activity_id do
245 stream_out_participations(conversation.participations)
250 def stream_out_participations(_, _), do: :noop
252 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
253 when data_type in ["Create", "Announce", "Delete"] do
255 |> Topics.get_activity_topics()
256 |> Streamer.stream(activity)
259 def stream_out(_activity) do
263 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
264 def create(params, fake \\ false) do
265 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
270 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
271 additional = params[:additional] || %{}
272 # only accept false as false value
273 local = !(params[:local] == false)
274 published = params[:published]
275 quick_insert? = Config.get([:env]) == :benchmark
279 %{to: to, actor: actor, published: published, context: context, object: object},
282 {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 _ <- increase_poll_votes_if_vote(create_data),
286 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
287 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
288 :ok <- maybe_federate(activity) do
291 {:quick_insert, true, activity} ->
294 {:fake, true, activity} ->
298 Repo.rollback(message)
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
311 %{to: to, actor: actor, published: published, context: context, object: object},
314 {:ok, activity} <- insert(listen_data, local),
315 :ok <- maybe_federate(activity) do
320 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
321 def accept(params) do
322 accept_or_reject("Accept", params)
325 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
326 def reject(params) do
327 accept_or_reject("Reject", params)
330 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
331 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
332 local = Map.get(params, :local, true)
333 activity_id = Map.get(params, :activity_id, nil)
336 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
337 |> Utils.maybe_put("id", activity_id),
338 {:ok, activity} <- insert(data, local),
339 :ok <- maybe_federate(activity) do
344 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
346 local = !(params[:local] == false)
347 activity_id = params[:activity_id]
356 data <- Utils.maybe_put(data, "id", activity_id),
357 {:ok, activity} <- insert(data, local),
358 :ok <- maybe_federate(activity) do
363 @spec react_with_emoji(User.t(), Object.t(), String.t(), keyword()) ::
364 {:ok, Activity.t(), Object.t()} | {:error, any()}
365 def react_with_emoji(user, object, emoji, options \\ []) do
366 with {:ok, result} <-
367 Repo.transaction(fn -> do_react_with_emoji(user, object, emoji, options) end) do
372 defp do_react_with_emoji(user, object, emoji, options) do
373 with local <- Keyword.get(options, :local, true),
374 activity_id <- Keyword.get(options, :activity_id, nil),
375 true <- Pleroma.Emoji.is_unicode_emoji?(emoji),
376 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
377 {:ok, activity} <- insert(reaction_data, local),
378 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
379 :ok <- maybe_federate(activity) do
380 {:ok, activity, object}
382 false -> {:error, false}
383 {:error, error} -> Repo.rollback(error)
387 @spec unreact_with_emoji(User.t(), String.t(), keyword()) ::
388 {:ok, Activity.t(), Object.t()} | {:error, any()}
389 def unreact_with_emoji(user, reaction_id, options \\ []) do
390 with {:ok, result} <-
391 Repo.transaction(fn -> do_unreact_with_emoji(user, reaction_id, options) end) do
396 defp do_unreact_with_emoji(user, reaction_id, options) do
397 with local <- Keyword.get(options, :local, true),
398 activity_id <- Keyword.get(options, :activity_id, nil),
399 user_ap_id <- user.ap_id,
400 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
401 object <- Object.normalize(reaction_activity),
402 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
403 {:ok, activity} <- insert(unreact_data, local),
404 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
405 :ok <- maybe_federate(activity) do
406 {:ok, activity, object}
408 {:error, error} -> Repo.rollback(error)
412 @spec unlike(User.t(), Object.t(), String.t() | nil, boolean()) ::
413 {:ok, Activity.t(), Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
414 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
415 with {:ok, result} <-
416 Repo.transaction(fn -> do_unlike(actor, object, activity_id, local) end) do
421 defp do_unlike(actor, object, activity_id, local) do
422 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
423 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
424 {:ok, unlike_activity} <- insert(unlike_data, local),
425 {:ok, _activity} <- Repo.delete(like_activity),
426 {:ok, object} <- remove_like_from_object(like_activity, object),
427 :ok <- maybe_federate(unlike_activity) do
428 {:ok, unlike_activity, like_activity, object}
431 {:error, error} -> Repo.rollback(error)
435 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
436 {:ok, Activity.t(), Object.t()} | {:error, any()}
438 %User{ap_id: _} = user,
439 %Object{data: %{"id" => _}} = object,
444 with {:ok, result} <-
445 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
450 defp do_announce(user, object, activity_id, local, public) do
451 with true <- is_announceable?(object, user, public),
452 announce_data <- make_announce_data(user, object, activity_id, public),
453 {:ok, activity} <- insert(announce_data, local),
454 {:ok, object} <- add_announce_to_object(activity, object),
455 :ok <- maybe_federate(activity) do
456 {:ok, activity, object}
458 false -> {:error, false}
459 {:error, error} -> Repo.rollback(error)
463 @spec unannounce(User.t(), Object.t(), String.t() | nil, boolean()) ::
464 {:ok, Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
471 with {:ok, result} <-
472 Repo.transaction(fn -> do_unannounce(actor, object, activity_id, local) end) do
477 defp do_unannounce(actor, object, activity_id, local) do
478 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
479 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
480 {:ok, unannounce_activity} <- insert(unannounce_data, local),
481 :ok <- maybe_federate(unannounce_activity),
482 {:ok, _activity} <- Repo.delete(announce_activity),
483 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
484 {:ok, unannounce_activity, object}
487 {:error, error} -> Repo.rollback(error)
491 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
492 {:ok, Activity.t()} | {:error, any()}
493 def follow(follower, followed, activity_id \\ nil, local \\ true) do
494 with {:ok, result} <-
495 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
500 defp do_follow(follower, followed, activity_id, local) do
501 with data <- make_follow_data(follower, followed, activity_id),
502 {:ok, activity} <- insert(data, local),
503 :ok <- maybe_federate(activity) do
506 {:error, error} -> Repo.rollback(error)
510 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
511 {:ok, Activity.t()} | nil | {:error, any()}
512 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
513 with {:ok, result} <-
514 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
519 defp do_unfollow(follower, followed, activity_id, local) do
520 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
521 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
522 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
523 {:ok, activity} <- insert(unfollow_data, local),
524 :ok <- maybe_federate(activity) do
528 {:error, error} -> Repo.rollback(error)
532 @spec delete(User.t() | Object.t(), keyword()) :: {:ok, User.t() | Object.t()} | {:error, any()}
533 def delete(entity, options \\ []) do
534 with {:ok, result} <- Repo.transaction(fn -> do_delete(entity, options) end) do
539 defp do_delete(%User{ap_id: ap_id, follower_address: follower_address} = user, _) do
541 "to" => [follower_address],
544 "object" => %{"type" => "Person", "id" => ap_id}
546 {:ok, activity} <- insert(data, true, true, true),
547 :ok <- maybe_federate(activity) do
552 defp do_delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options) do
553 local = Keyword.get(options, :local, true)
554 activity_id = Keyword.get(options, :activity_id, nil)
555 actor = Keyword.get(options, :actor, actor)
557 user = User.get_cached_by_ap_id(actor)
558 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
560 with create_activity <- Activity.get_create_by_object_ap_id(id),
567 "deleted_activity_id" => create_activity && create_activity.id
569 |> maybe_put("id", activity_id),
570 {:ok, activity} <- insert(data, local, false),
571 {:ok, object, _create_activity} <- Object.delete(object),
572 stream_out_participations(object, user),
573 _ <- decrease_replies_count_if_reply(object),
574 {:ok, _actor} <- decrease_note_count_if_public(user, object),
575 :ok <- maybe_federate(activity) do
583 defp do_delete(%Object{data: %{"type" => "Tombstone", "id" => ap_id}}, _) do
586 |> Activity.Queries.by_object_id()
587 |> Activity.Queries.by_type("Delete")
593 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
594 {:ok, Activity.t()} | {:error, any()}
595 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
596 with {:ok, result} <-
597 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
602 defp do_block(blocker, blocked, activity_id, local) do
603 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
604 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
606 if unfollow_blocked do
607 follow_activity = fetch_latest_follow(blocker, blocked)
608 if follow_activity, do: unfollow(blocker, blocked, nil, local)
611 with true <- outgoing_blocks,
612 block_data <- make_block_data(blocker, blocked, activity_id),
613 {:ok, activity} <- insert(block_data, local),
614 :ok <- maybe_federate(activity) do
617 {:error, error} -> Repo.rollback(error)
621 @spec unblock(User.t(), User.t(), String.t() | nil, boolean()) ::
622 {:ok, Activity.t()} | {:error, any()} | nil
623 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
624 with {:ok, result} <-
625 Repo.transaction(fn -> do_unblock(blocker, blocked, activity_id, local) end) do
630 defp do_unblock(blocker, blocked, activity_id, local) do
631 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
632 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
633 {:ok, activity} <- insert(unblock_data, local),
634 :ok <- maybe_federate(activity) do
638 {:error, error} -> Repo.rollback(error)
642 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
652 # only accept false as false value
653 local = !(params[:local] == false)
654 forward = !(params[:forward] == false)
656 additional = params[:additional] || %{}
660 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
662 Map.merge(additional, %{"to" => [], "cc" => []})
665 with flag_data <- make_flag_data(params, additional),
666 {:ok, activity} <- insert(flag_data, local),
667 {:ok, stripped_activity} <- strip_report_status_data(activity),
668 :ok <- maybe_federate(stripped_activity) do
669 User.all_superusers()
670 |> Enum.filter(fn user -> not is_nil(user.email) end)
671 |> Enum.each(fn superuser ->
673 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
674 |> Pleroma.Emails.Mailer.deliver_async()
681 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
682 def move(%User{} = origin, %User{} = target, local \\ true) do
685 "actor" => origin.ap_id,
686 "object" => origin.ap_id,
687 "target" => target.ap_id
690 with true <- origin.ap_id in target.also_known_as,
691 {:ok, activity} <- insert(params, local) do
692 maybe_federate(activity)
694 BackgroundWorker.enqueue("move_following", %{
695 "origin_id" => origin.id,
696 "target_id" => target.id
701 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
706 def fetch_activities_for_context_query(context, opts) do
707 public = [Constants.as_public()]
711 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
714 from(activity in Activity)
715 |> maybe_preload_objects(opts)
716 |> maybe_preload_bookmarks(opts)
717 |> maybe_set_thread_muted_field(opts)
718 |> restrict_blocked(opts)
719 |> restrict_recipients(recipients, opts["user"])
723 "?->>'type' = ? and ?->>'context' = ?",
730 |> exclude_poll_votes(opts)
732 |> order_by([activity], desc: activity.id)
735 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
736 def fetch_activities_for_context(context, opts \\ %{}) do
738 |> fetch_activities_for_context_query(opts)
742 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
743 FlakeId.Ecto.CompatType.t() | nil
744 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
746 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
752 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
753 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
754 opts = Map.drop(opts, ["user"])
756 [Constants.as_public()]
757 |> fetch_activities_query(opts)
758 |> restrict_unlisted()
759 |> Pagination.fetch_paginated(opts, pagination)
762 @valid_visibilities ~w[direct unlisted public private]
764 defp restrict_visibility(query, %{visibility: visibility})
765 when is_list(visibility) do
766 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
772 "activity_visibility(?, ?, ?) = ANY (?)",
782 Logger.error("Could not restrict visibility to #{visibility}")
786 defp restrict_visibility(query, %{visibility: visibility})
787 when visibility in @valid_visibilities do
791 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
795 defp restrict_visibility(_query, %{visibility: visibility})
796 when visibility not in @valid_visibilities do
797 Logger.error("Could not restrict visibility to #{visibility}")
800 defp restrict_visibility(query, _visibility), do: query
802 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
803 when is_list(visibility) do
804 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
809 "activity_visibility(?, ?, ?) = ANY (?)",
817 Logger.error("Could not exclude visibility to #{visibility}")
822 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
823 when visibility in @valid_visibilities do
828 "activity_visibility(?, ?, ?) = ?",
837 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
838 when visibility not in @valid_visibilities do
839 Logger.error("Could not exclude visibility to #{visibility}")
843 defp exclude_visibility(query, _visibility), do: query
845 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
848 defp restrict_thread_visibility(
850 %{"user" => %User{skip_thread_containment: true}},
855 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
858 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
862 defp restrict_thread_visibility(query, _, _), do: query
864 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
867 |> Map.put("user", reading_user)
868 |> Map.put("actor_id", user.ap_id)
871 user_activities_recipients(%{
872 "godmode" => params["godmode"],
873 "reading_user" => reading_user
876 fetch_activities(recipients, params)
880 def fetch_user_activities(user, reading_user, params \\ %{}) do
883 |> Map.put("type", ["Create", "Announce"])
884 |> Map.put("user", reading_user)
885 |> Map.put("actor_id", user.ap_id)
886 |> Map.put("pinned_activity_ids", user.pinned_activities)
889 if User.blocks?(reading_user, user) do
893 |> Map.put("blocking_user", reading_user)
894 |> Map.put("muting_user", reading_user)
898 user_activities_recipients(%{
899 "godmode" => params["godmode"],
900 "reading_user" => reading_user
903 fetch_activities(recipients, params)
907 def fetch_statuses(reading_user, params) do
910 |> Map.put("type", ["Create", "Announce"])
913 user_activities_recipients(%{
914 "godmode" => params["godmode"],
915 "reading_user" => reading_user
918 fetch_activities(recipients, params, :offset)
922 defp user_activities_recipients(%{"godmode" => true}) do
926 defp user_activities_recipients(%{"reading_user" => reading_user}) do
928 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
930 [Constants.as_public()]
934 defp restrict_since(query, %{"since_id" => ""}), do: query
936 defp restrict_since(query, %{"since_id" => since_id}) do
937 from(activity in query, where: activity.id > ^since_id)
940 defp restrict_since(query, _), do: query
942 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
943 raise "Can't use the child object without preloading!"
946 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
947 when is_list(tag_reject) and tag_reject != [] do
949 [_activity, object] in query,
950 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
954 defp restrict_tag_reject(query, _), do: query
956 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
957 raise "Can't use the child object without preloading!"
960 defp restrict_tag_all(query, %{"tag_all" => tag_all})
961 when is_list(tag_all) and tag_all != [] do
963 [_activity, object] in query,
964 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
968 defp restrict_tag_all(query, _), do: query
970 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
971 raise "Can't use the child object without preloading!"
974 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
976 [_activity, object] in query,
977 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
981 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
983 [_activity, object] in query,
984 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
988 defp restrict_tag(query, _), do: query
990 defp restrict_recipients(query, [], _user), do: query
992 defp restrict_recipients(query, recipients, nil) do
993 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
996 defp restrict_recipients(query, recipients, user) do
999 where: fragment("? && ?", ^recipients, activity.recipients),
1000 or_where: activity.actor == ^user.ap_id
1004 defp restrict_local(query, %{"local_only" => true}) do
1005 from(activity in query, where: activity.local == true)
1008 defp restrict_local(query, _), do: query
1010 defp restrict_actor(query, %{"actor_id" => actor_id}) do
1011 from(activity in query, where: activity.actor == ^actor_id)
1014 defp restrict_actor(query, _), do: query
1016 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
1017 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
1020 defp restrict_type(query, %{"type" => type}) do
1021 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
1024 defp restrict_type(query, _), do: query
1026 defp restrict_state(query, %{"state" => state}) do
1027 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
1030 defp restrict_state(query, _), do: query
1032 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
1034 [_activity, object] in query,
1035 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
1039 defp restrict_favorited_by(query, _), do: query
1041 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
1042 raise "Can't use the child object without preloading!"
1045 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
1047 [_activity, object] in query,
1048 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
1052 defp restrict_media(query, _), do: query
1054 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
1056 [_activity, object] in query,
1057 where: fragment("?->>'inReplyTo' is null", object.data)
1061 defp restrict_replies(query, _), do: query
1063 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
1064 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1067 defp restrict_reblogs(query, _), do: query
1069 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
1071 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
1072 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
1075 from([activity] in query,
1076 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1077 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
1080 unless opts["skip_preload"] do
1081 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1087 defp restrict_muted(query, _), do: query
1089 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
1090 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
1091 domain_blocks = user.domain_blocks || []
1093 following_ap_ids = User.get_friends_ap_ids(user)
1096 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1099 [activity, object: o] in query,
1100 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1101 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
1104 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1111 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1119 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1128 defp restrict_blocked(query, _), do: query
1130 defp restrict_unlisted(query) do
1135 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1137 ^[Constants.as_public()]
1142 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
1143 from(activity in query, where: activity.id in ^ids)
1146 defp restrict_pinned(query, _), do: query
1148 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1149 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1155 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1163 defp restrict_muted_reblogs(query, _), do: query
1165 defp restrict_instance(query, %{"instance" => instance}) do
1170 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1174 from(activity in query, where: activity.actor in ^users)
1177 defp restrict_instance(query, _), do: query
1179 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1181 defp exclude_poll_votes(query, _) do
1182 if has_named_binding?(query, :object) do
1183 from([activity, object: o] in query,
1184 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1191 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1192 from(activity in query, where: activity.id != ^id)
1195 defp exclude_id(query, _), do: query
1197 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1199 defp maybe_preload_objects(query, _) do
1201 |> Activity.with_preloaded_object()
1204 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1206 defp maybe_preload_bookmarks(query, opts) do
1208 |> Activity.with_preloaded_bookmark(opts["user"])
1211 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1213 |> Activity.with_preloaded_report_notes()
1216 defp maybe_preload_report_notes(query, _), do: query
1218 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1220 defp maybe_set_thread_muted_field(query, opts) do
1222 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1225 defp maybe_order(query, %{order: :desc}) do
1227 |> order_by(desc: :id)
1230 defp maybe_order(query, %{order: :asc}) do
1232 |> order_by(asc: :id)
1235 defp maybe_order(query, _), do: query
1237 defp fetch_activities_query_ap_ids_ops(opts) do
1238 source_user = opts["muting_user"]
1239 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1241 ap_id_relationships =
1242 ap_id_relationships ++
1243 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1249 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1251 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1252 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1254 restrict_muted_reblogs_opts =
1255 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1257 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1260 def fetch_activities_query(recipients, opts \\ %{}) do
1261 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1262 fetch_activities_query_ap_ids_ops(opts)
1265 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1269 |> maybe_preload_objects(opts)
1270 |> maybe_preload_bookmarks(opts)
1271 |> maybe_preload_report_notes(opts)
1272 |> maybe_set_thread_muted_field(opts)
1273 |> maybe_order(opts)
1274 |> restrict_recipients(recipients, opts["user"])
1275 |> restrict_tag(opts)
1276 |> restrict_tag_reject(opts)
1277 |> restrict_tag_all(opts)
1278 |> restrict_since(opts)
1279 |> restrict_local(opts)
1280 |> restrict_actor(opts)
1281 |> restrict_type(opts)
1282 |> restrict_state(opts)
1283 |> restrict_favorited_by(opts)
1284 |> restrict_blocked(restrict_blocked_opts)
1285 |> restrict_muted(restrict_muted_opts)
1286 |> restrict_media(opts)
1287 |> restrict_visibility(opts)
1288 |> restrict_thread_visibility(opts, config)
1289 |> restrict_replies(opts)
1290 |> restrict_reblogs(opts)
1291 |> restrict_pinned(opts)
1292 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1293 |> restrict_instance(opts)
1294 |> Activity.restrict_deactivated_users()
1295 |> exclude_poll_votes(opts)
1296 |> exclude_visibility(opts)
1299 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1300 list_memberships = Pleroma.List.memberships(opts["user"])
1302 fetch_activities_query(recipients ++ list_memberships, opts)
1303 |> Pagination.fetch_paginated(opts, pagination)
1305 |> maybe_update_cc(list_memberships, opts["user"])
1309 Fetch favorites activities of user with order by sort adds to favorites
1311 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1312 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1314 |> Activity.Queries.by_actor()
1315 |> Activity.Queries.by_type("Like")
1316 |> Activity.with_joined_object()
1317 |> Object.with_joined_activity()
1318 |> select([_like, object, activity], %{activity | object: object})
1319 |> order_by([like, _, _], desc: like.id)
1320 |> Pagination.fetch_paginated(
1321 Map.merge(params, %{"skip_order" => true}),
1327 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1328 when is_list(list_memberships) and length(list_memberships) > 0 do
1329 Enum.map(activities, fn
1330 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1331 if Enum.any?(bcc, &(&1 in list_memberships)) do
1332 update_in(activity.data["cc"], &[user_ap_id | &1])
1342 defp maybe_update_cc(activities, _, _), do: activities
1344 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1345 from(activity in query,
1347 fragment("? && ?", activity.recipients, ^recipients) or
1348 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1349 ^Constants.as_public() in activity.recipients)
1353 def fetch_activities_bounded(
1355 recipients_with_public,
1357 pagination \\ :keyset
1359 fetch_activities_query([], opts)
1360 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1361 |> Pagination.fetch_paginated(opts, pagination)
1365 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1366 def upload(file, opts \\ []) do
1367 with {:ok, data} <- Upload.store(file, opts) do
1370 Map.put(data, "actor", opts[:actor])
1375 Repo.insert(%Object{data: obj_data})
1379 @spec get_actor_url(any()) :: binary() | nil
1380 defp get_actor_url(url) when is_binary(url), do: url
1381 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1383 defp get_actor_url(url) when is_list(url) do
1389 defp get_actor_url(_url), do: nil
1391 defp object_to_user_data(data) do
1393 data["icon"]["url"] &&
1396 "url" => [%{"href" => data["icon"]["url"]}]
1400 data["image"]["url"] &&
1403 "url" => [%{"href" => data["image"]["url"]}]
1408 |> Map.get("attachment", [])
1409 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1410 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1414 |> Map.get("tag", [])
1416 %{"type" => "Emoji"} -> true
1419 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1420 Map.put(acc, String.trim(name, ":"), url)
1423 locked = data["manuallyApprovesFollowers"] || false
1424 data = Transmogrifier.maybe_fix_user_object(data)
1425 discoverable = data["discoverable"] || false
1426 invisible = data["invisible"] || false
1427 actor_type = data["type"] || "Person"
1430 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1431 data["publicKey"]["publicKeyPem"]
1437 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1438 data["endpoints"]["sharedInbox"]
1445 uri: get_actor_url(data["url"]),
1451 discoverable: discoverable,
1452 invisible: invisible,
1455 follower_address: data["followers"],
1456 following_address: data["following"],
1457 bio: data["summary"],
1458 actor_type: actor_type,
1459 also_known_as: Map.get(data, "alsoKnownAs", []),
1460 public_key: public_key,
1461 inbox: data["inbox"],
1462 shared_inbox: shared_inbox
1465 # nickname can be nil because of virtual actors
1467 if data["preferredUsername"] do
1471 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1474 Map.put(user_data, :nickname, nil)
1480 def fetch_follow_information_for_user(user) do
1481 with {:ok, following_data} <-
1482 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1483 {:ok, hide_follows} <- collection_private(following_data),
1484 {:ok, followers_data} <-
1485 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1486 {:ok, hide_followers} <- collection_private(followers_data) do
1489 hide_follows: hide_follows,
1490 follower_count: normalize_counter(followers_data["totalItems"]),
1491 following_count: normalize_counter(following_data["totalItems"]),
1492 hide_followers: hide_followers
1495 {:error, _} = e -> e
1500 defp normalize_counter(counter) when is_integer(counter), do: counter
1501 defp normalize_counter(_), do: 0
1503 defp maybe_update_follow_information(data) do
1504 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1505 {:ok, info} <- fetch_follow_information_for_user(data) do
1506 info = Map.merge(data[:info] || %{}, info)
1507 Map.put(data, :info, info)
1509 {:enabled, false} ->
1514 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1521 defp collection_private(%{"first" => %{"type" => type}})
1522 when type in ["CollectionPage", "OrderedCollectionPage"],
1525 defp collection_private(%{"first" => first}) do
1526 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1527 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1530 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1531 {:error, _} = e -> e
1536 defp collection_private(_data), do: {:ok, true}
1538 def user_data_from_user_object(data) do
1539 with {:ok, data} <- MRF.filter(data),
1540 {:ok, data} <- object_to_user_data(data) do
1547 def fetch_and_prepare_user_from_ap_id(ap_id) do
1548 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1549 {:ok, data} <- user_data_from_user_object(data),
1550 data <- maybe_update_follow_information(data) do
1553 {:error, "Object has been deleted"} = e ->
1554 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1558 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1563 def make_user_from_ap_id(ap_id) do
1564 user = User.get_cached_by_ap_id(ap_id)
1566 if user && !User.ap_enabled?(user) do
1567 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1569 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1572 |> User.remote_user_changeset(data)
1573 |> User.update_and_set_cache()
1576 |> User.remote_user_changeset()
1586 def make_user_from_nickname(nickname) do
1587 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1588 make_user_from_ap_id(ap_id)
1590 _e -> {:error, "No AP id in WebFinger"}
1594 # filter out broken threads
1595 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1596 entire_thread_visible_for_user?(activity, user)
1599 # do post-processing on a specific activity
1600 def contain_activity(%Activity{} = activity, %User{} = user) do
1601 contain_broken_threads(activity, user)
1604 def fetch_direct_messages_query do
1606 |> restrict_type(%{"type" => "Create"})
1607 |> restrict_visibility(%{visibility: "direct"})
1608 |> order_by([activity], asc: activity.id)