1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 # For Announce activities, we filter the recipients based on following status for any actors
36 # that match actual users. See issue #164 for more information about why this is necessary.
37 defp get_recipients(%{"type" => "Announce"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = User.get_cached_by_ap_id(data["actor"])
44 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
45 case User.get_cached_by_ap_id(recipient) do
47 user -> User.following?(user, actor)
54 defp get_recipients(%{"type" => "Create"} = data) do
55 to = Map.get(data, "to", [])
56 cc = Map.get(data, "cc", [])
57 bcc = Map.get(data, "bcc", [])
58 actor = Map.get(data, "actor", [])
59 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
63 defp get_recipients(data) do
64 to = Map.get(data, "to", [])
65 cc = Map.get(data, "cc", [])
66 bcc = Map.get(data, "bcc", [])
67 recipients = Enum.concat([to, cc, bcc])
71 defp check_actor_is_active(actor) do
72 if not is_nil(actor) do
73 with user <- User.get_cached_by_ap_id(actor),
74 false <- user.deactivated do
84 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
85 limit = Config.get([:instance, :remote_limit])
86 String.length(content) <= limit
89 defp check_remote_limit(_), do: true
91 def increase_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
95 def decrease_note_count_if_public(actor, object) do
96 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
99 def increase_replies_count_if_reply(%{
100 "object" => %{"inReplyTo" => reply_ap_id} = object,
103 if is_public?(object) do
104 Object.increase_replies_count(reply_ap_id)
108 def increase_replies_count_if_reply(_create_data), do: :noop
110 def decrease_replies_count_if_reply(%Object{
111 data: %{"inReplyTo" => reply_ap_id} = object
113 if is_public?(object) do
114 Object.decrease_replies_count(reply_ap_id)
118 def decrease_replies_count_if_reply(_object), do: :noop
120 def increase_poll_votes_if_vote(%{
121 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
125 Object.increase_vote_count(reply_ap_id, name, actor)
128 def increase_poll_votes_if_vote(_create_data), do: :noop
130 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
131 def persist(object, meta) do
132 with local <- Keyword.fetch!(meta, :local),
133 {recipients, _, _} <- get_recipients(object),
135 Repo.insert(%Activity{
138 recipients: recipients,
139 actor: object["actor"]
141 {:ok, activity, meta}
145 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
146 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
147 with nil <- Activity.normalize(map),
148 map <- lazy_put_activity_defaults(map, fake),
149 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
150 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
151 {:ok, map} <- MRF.filter(map),
152 {recipients, _, _} = get_recipients(map),
153 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
154 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
155 {:ok, map, object} <- insert_full_object(map) do
161 recipients: recipients
164 |> maybe_create_activity_expiration()
166 # Splice in the child object if we have one.
168 if not is_nil(object) do
169 Map.put(activity, :object, object)
174 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
176 Notification.create_notifications(activity)
178 conversation = create_or_bump_conversation(activity, map["actor"])
179 participations = get_participations(conversation)
181 stream_out_participations(participations)
184 %Activity{} = activity ->
187 {:fake, true, map, recipients} ->
188 activity = %Activity{
192 recipients: recipients,
196 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
204 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
205 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
210 defp maybe_create_activity_expiration(result), do: result
212 defp create_or_bump_conversation(activity, actor) do
213 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
214 %User{} = user <- User.get_cached_by_ap_id(actor),
215 Participation.mark_as_read(user, conversation) do
220 defp get_participations({:ok, conversation}) do
222 |> Repo.preload(:participations, force: true)
223 |> Map.get(:participations)
226 defp get_participations(_), do: []
228 def stream_out_participations(participations) do
231 |> Repo.preload(:user)
233 Streamer.stream("participation", participations)
236 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
237 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
238 conversation = Repo.preload(conversation, :participations),
240 fetch_latest_activity_id_for_context(conversation.ap_id, %{
242 "blocking_user" => user
244 if last_activity_id do
245 stream_out_participations(conversation.participations)
250 def stream_out_participations(_, _), do: :noop
252 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
253 when data_type in ["Create", "Announce", "Delete"] do
255 |> Topics.get_activity_topics()
256 |> Streamer.stream(activity)
259 def stream_out(_activity) do
263 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
264 def create(params, fake \\ false) do
265 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
270 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
271 additional = params[:additional] || %{}
272 # only accept false as false value
273 local = !(params[:local] == false)
274 published = params[:published]
275 quick_insert? = Config.get([:env]) == :benchmark
279 %{to: to, actor: actor, published: published, context: context, object: object},
282 {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 _ <- increase_poll_votes_if_vote(create_data),
286 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
287 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
288 :ok <- maybe_federate(activity) do
291 {:quick_insert, true, activity} ->
294 {:fake, true, activity} ->
298 Repo.rollback(message)
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
311 %{to: to, actor: actor, published: published, context: context, object: object},
314 {:ok, activity} <- insert(listen_data, local),
315 :ok <- maybe_federate(activity) do
320 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
321 def accept(params) do
322 accept_or_reject("Accept", params)
325 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
326 def reject(params) do
327 accept_or_reject("Reject", params)
330 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
331 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
332 local = Map.get(params, :local, true)
333 activity_id = Map.get(params, :activity_id, nil)
336 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
337 |> Utils.maybe_put("id", activity_id),
338 {:ok, activity} <- insert(data, local),
339 :ok <- maybe_federate(activity) do
344 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
346 local = !(params[:local] == false)
347 activity_id = params[:activity_id]
356 data <- Utils.maybe_put(data, "id", activity_id),
357 {:ok, activity} <- insert(data, local),
358 :ok <- maybe_federate(activity) do
363 @spec react_with_emoji(User.t(), Object.t(), String.t(), keyword()) ::
364 {:ok, Activity.t(), Object.t()} | {:error, any()}
365 def react_with_emoji(user, object, emoji, options \\ []) do
366 with {:ok, result} <-
367 Repo.transaction(fn -> do_react_with_emoji(user, object, emoji, options) end) do
372 defp do_react_with_emoji(user, object, emoji, options) do
373 with local <- Keyword.get(options, :local, true),
374 activity_id <- Keyword.get(options, :activity_id, nil),
375 true <- Pleroma.Emoji.is_unicode_emoji?(emoji),
376 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
377 {:ok, activity} <- insert(reaction_data, local),
378 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
379 :ok <- maybe_federate(activity) do
380 {:ok, activity, object}
382 false -> {:error, false}
383 {:error, error} -> Repo.rollback(error)
387 @spec unreact_with_emoji(User.t(), String.t(), keyword()) ::
388 {:ok, Activity.t(), Object.t()} | {:error, any()}
389 def unreact_with_emoji(user, reaction_id, options \\ []) do
390 with {:ok, result} <-
391 Repo.transaction(fn -> do_unreact_with_emoji(user, reaction_id, options) end) do
396 defp do_unreact_with_emoji(user, reaction_id, options) do
397 with local <- Keyword.get(options, :local, true),
398 activity_id <- Keyword.get(options, :activity_id, nil),
399 user_ap_id <- user.ap_id,
400 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
401 object <- Object.normalize(reaction_activity),
402 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
403 {:ok, activity} <- insert(unreact_data, local),
404 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
405 :ok <- maybe_federate(activity) do
406 {:ok, activity, object}
408 {:error, error} -> Repo.rollback(error)
412 @spec unlike(User.t(), Object.t(), String.t() | nil, boolean()) ::
413 {:ok, Activity.t(), Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
414 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
415 with {:ok, result} <-
416 Repo.transaction(fn -> do_unlike(actor, object, activity_id, local) end) do
421 defp do_unlike(actor, object, activity_id, local) do
422 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
423 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
424 {:ok, unlike_activity} <- insert(unlike_data, local),
425 {:ok, _activity} <- Repo.delete(like_activity),
426 {:ok, object} <- remove_like_from_object(like_activity, object),
427 :ok <- maybe_federate(unlike_activity) do
428 {:ok, unlike_activity, like_activity, object}
431 {:error, error} -> Repo.rollback(error)
435 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
436 {:ok, Activity.t(), Object.t()} | {:error, any()}
438 %User{ap_id: _} = user,
439 %Object{data: %{"id" => _}} = object,
444 with {:ok, result} <-
445 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
450 defp do_announce(user, object, activity_id, local, public) do
451 with true <- is_announceable?(object, user, public),
452 object <- Object.get_by_id(object.id),
453 announce_data <- make_announce_data(user, object, activity_id, public),
454 {:ok, activity} <- insert(announce_data, local),
455 {:ok, object} <- add_announce_to_object(activity, object),
456 :ok <- maybe_federate(activity) do
457 {:ok, activity, object}
459 false -> {:error, false}
460 {:error, error} -> Repo.rollback(error)
464 @spec unannounce(User.t(), Object.t(), String.t() | nil, boolean()) ::
465 {:ok, Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
472 with {:ok, result} <-
473 Repo.transaction(fn -> do_unannounce(actor, object, activity_id, local) end) do
478 defp do_unannounce(actor, object, activity_id, local) do
479 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
480 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
481 {:ok, unannounce_activity} <- insert(unannounce_data, local),
482 :ok <- maybe_federate(unannounce_activity),
483 {:ok, _activity} <- Repo.delete(announce_activity),
484 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
485 {:ok, unannounce_activity, object}
488 {:error, error} -> Repo.rollback(error)
492 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
493 {:ok, Activity.t()} | {:error, any()}
494 def follow(follower, followed, activity_id \\ nil, local \\ true) do
495 with {:ok, result} <-
496 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
501 defp do_follow(follower, followed, activity_id, local) do
502 with data <- make_follow_data(follower, followed, activity_id),
503 {:ok, activity} <- insert(data, local),
504 :ok <- maybe_federate(activity) do
507 {:error, error} -> Repo.rollback(error)
511 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
512 {:ok, Activity.t()} | nil | {:error, any()}
513 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
514 with {:ok, result} <-
515 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
520 defp do_unfollow(follower, followed, activity_id, local) do
521 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
522 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
523 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
524 {:ok, activity} <- insert(unfollow_data, local),
525 :ok <- maybe_federate(activity) do
529 {:error, error} -> Repo.rollback(error)
533 @spec delete(User.t() | Object.t(), keyword()) :: {:ok, User.t() | Object.t()} | {:error, any()}
534 def delete(entity, options \\ []) do
535 with {:ok, result} <- Repo.transaction(fn -> do_delete(entity, options) end) do
540 defp do_delete(%User{ap_id: ap_id, follower_address: follower_address} = user, _) do
542 "to" => [follower_address],
545 "object" => %{"type" => "Person", "id" => ap_id}
547 {:ok, activity} <- insert(data, true, true, true),
548 :ok <- maybe_federate(activity) do
553 defp do_delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options) do
554 local = Keyword.get(options, :local, true)
555 activity_id = Keyword.get(options, :activity_id, nil)
556 actor = Keyword.get(options, :actor, actor)
558 user = User.get_cached_by_ap_id(actor)
559 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
561 with create_activity <- Activity.get_create_by_object_ap_id(id),
568 "deleted_activity_id" => create_activity && create_activity.id
570 |> maybe_put("id", activity_id),
571 {:ok, activity} <- insert(data, local, false),
572 {:ok, object, _create_activity} <- Object.delete(object),
573 stream_out_participations(object, user),
574 _ <- decrease_replies_count_if_reply(object),
575 {:ok, _actor} <- decrease_note_count_if_public(user, object),
576 :ok <- maybe_federate(activity) do
584 defp do_delete(%Object{data: %{"type" => "Tombstone", "id" => ap_id}}, _) do
587 |> Activity.Queries.by_object_id()
588 |> Activity.Queries.by_type("Delete")
594 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
595 {:ok, Activity.t()} | {:error, any()}
596 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
597 with {:ok, result} <-
598 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
603 defp do_block(blocker, blocked, activity_id, local) do
604 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
605 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
607 if unfollow_blocked do
608 follow_activity = fetch_latest_follow(blocker, blocked)
609 if follow_activity, do: unfollow(blocker, blocked, nil, local)
612 with true <- outgoing_blocks,
613 block_data <- make_block_data(blocker, blocked, activity_id),
614 {:ok, activity} <- insert(block_data, local),
615 :ok <- maybe_federate(activity) do
618 {:error, error} -> Repo.rollback(error)
622 @spec unblock(User.t(), User.t(), String.t() | nil, boolean()) ::
623 {:ok, Activity.t()} | {:error, any()} | nil
624 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
625 with {:ok, result} <-
626 Repo.transaction(fn -> do_unblock(blocker, blocked, activity_id, local) end) do
631 defp do_unblock(blocker, blocked, activity_id, local) do
632 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
633 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
634 {:ok, activity} <- insert(unblock_data, local),
635 :ok <- maybe_federate(activity) do
639 {:error, error} -> Repo.rollback(error)
643 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
653 # only accept false as false value
654 local = !(params[:local] == false)
655 forward = !(params[:forward] == false)
657 additional = params[:additional] || %{}
661 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
663 Map.merge(additional, %{"to" => [], "cc" => []})
666 with flag_data <- make_flag_data(params, additional),
667 {:ok, activity} <- insert(flag_data, local),
668 {:ok, stripped_activity} <- strip_report_status_data(activity),
669 :ok <- maybe_federate(stripped_activity) do
670 User.all_superusers()
671 |> Enum.filter(fn user -> not is_nil(user.email) end)
672 |> Enum.each(fn superuser ->
674 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
675 |> Pleroma.Emails.Mailer.deliver_async()
682 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
683 def move(%User{} = origin, %User{} = target, local \\ true) do
686 "actor" => origin.ap_id,
687 "object" => origin.ap_id,
688 "target" => target.ap_id
691 with true <- origin.ap_id in target.also_known_as,
692 {:ok, activity} <- insert(params, local) do
693 maybe_federate(activity)
695 BackgroundWorker.enqueue("move_following", %{
696 "origin_id" => origin.id,
697 "target_id" => target.id
702 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
707 def fetch_activities_for_context_query(context, opts) do
708 public = [Constants.as_public()]
712 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
715 from(activity in Activity)
716 |> maybe_preload_objects(opts)
717 |> maybe_preload_bookmarks(opts)
718 |> maybe_set_thread_muted_field(opts)
719 |> restrict_blocked(opts)
720 |> restrict_recipients(recipients, opts["user"])
724 "?->>'type' = ? and ?->>'context' = ?",
731 |> exclude_poll_votes(opts)
733 |> order_by([activity], desc: activity.id)
736 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
737 def fetch_activities_for_context(context, opts \\ %{}) do
739 |> fetch_activities_for_context_query(opts)
743 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
744 FlakeId.Ecto.CompatType.t() | nil
745 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
747 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
753 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
754 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
755 opts = Map.drop(opts, ["user"])
757 [Constants.as_public()]
758 |> fetch_activities_query(opts)
759 |> restrict_unlisted()
760 |> Pagination.fetch_paginated(opts, pagination)
763 @valid_visibilities ~w[direct unlisted public private]
765 defp restrict_visibility(query, %{visibility: visibility})
766 when is_list(visibility) do
767 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
773 "activity_visibility(?, ?, ?) = ANY (?)",
783 Logger.error("Could not restrict visibility to #{visibility}")
787 defp restrict_visibility(query, %{visibility: visibility})
788 when visibility in @valid_visibilities do
792 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
796 defp restrict_visibility(_query, %{visibility: visibility})
797 when visibility not in @valid_visibilities do
798 Logger.error("Could not restrict visibility to #{visibility}")
801 defp restrict_visibility(query, _visibility), do: query
803 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
804 when is_list(visibility) do
805 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
810 "activity_visibility(?, ?, ?) = ANY (?)",
818 Logger.error("Could not exclude visibility to #{visibility}")
823 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
824 when visibility in @valid_visibilities do
829 "activity_visibility(?, ?, ?) = ?",
838 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
839 when visibility not in [nil | @valid_visibilities] do
840 Logger.error("Could not exclude visibility to #{visibility}")
844 defp exclude_visibility(query, _visibility), do: query
846 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
849 defp restrict_thread_visibility(
851 %{"user" => %User{skip_thread_containment: true}},
856 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
859 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
863 defp restrict_thread_visibility(query, _, _), do: query
865 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
868 |> Map.put("user", reading_user)
869 |> Map.put("actor_id", user.ap_id)
872 user_activities_recipients(%{
873 "godmode" => params["godmode"],
874 "reading_user" => reading_user
877 fetch_activities(recipients, params)
881 def fetch_user_activities(user, reading_user, params \\ %{}) do
884 |> Map.put("type", ["Create", "Announce"])
885 |> Map.put("user", reading_user)
886 |> Map.put("actor_id", user.ap_id)
887 |> Map.put("pinned_activity_ids", user.pinned_activities)
890 if User.blocks?(reading_user, user) do
894 |> Map.put("blocking_user", reading_user)
895 |> Map.put("muting_user", reading_user)
899 user_activities_recipients(%{
900 "godmode" => params["godmode"],
901 "reading_user" => reading_user
904 fetch_activities(recipients, params)
908 def fetch_statuses(reading_user, params) do
911 |> Map.put("type", ["Create", "Announce"])
914 user_activities_recipients(%{
915 "godmode" => params["godmode"],
916 "reading_user" => reading_user
919 fetch_activities(recipients, params, :offset)
923 defp user_activities_recipients(%{"godmode" => true}) do
927 defp user_activities_recipients(%{"reading_user" => reading_user}) do
929 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
931 [Constants.as_public()]
935 defp restrict_since(query, %{"since_id" => ""}), do: query
937 defp restrict_since(query, %{"since_id" => since_id}) do
938 from(activity in query, where: activity.id > ^since_id)
941 defp restrict_since(query, _), do: query
943 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
944 raise "Can't use the child object without preloading!"
947 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
948 when is_list(tag_reject) and tag_reject != [] do
950 [_activity, object] in query,
951 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
955 defp restrict_tag_reject(query, _), do: query
957 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
958 raise "Can't use the child object without preloading!"
961 defp restrict_tag_all(query, %{"tag_all" => tag_all})
962 when is_list(tag_all) and tag_all != [] do
964 [_activity, object] in query,
965 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
969 defp restrict_tag_all(query, _), do: query
971 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
972 raise "Can't use the child object without preloading!"
975 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
977 [_activity, object] in query,
978 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
982 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
984 [_activity, object] in query,
985 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
989 defp restrict_tag(query, _), do: query
991 defp restrict_recipients(query, [], _user), do: query
993 defp restrict_recipients(query, recipients, nil) do
994 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
997 defp restrict_recipients(query, recipients, user) do
1000 where: fragment("? && ?", ^recipients, activity.recipients),
1001 or_where: activity.actor == ^user.ap_id
1005 defp restrict_local(query, %{"local_only" => true}) do
1006 from(activity in query, where: activity.local == true)
1009 defp restrict_local(query, _), do: query
1011 defp restrict_actor(query, %{"actor_id" => actor_id}) do
1012 from(activity in query, where: activity.actor == ^actor_id)
1015 defp restrict_actor(query, _), do: query
1017 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
1018 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
1021 defp restrict_type(query, %{"type" => type}) do
1022 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
1025 defp restrict_type(query, _), do: query
1027 defp restrict_state(query, %{"state" => state}) do
1028 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
1031 defp restrict_state(query, _), do: query
1033 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
1035 [_activity, object] in query,
1036 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
1040 defp restrict_favorited_by(query, _), do: query
1042 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
1043 raise "Can't use the child object without preloading!"
1046 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
1048 [_activity, object] in query,
1049 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
1053 defp restrict_media(query, _), do: query
1055 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
1057 [_activity, object] in query,
1058 where: fragment("?->>'inReplyTo' is null", object.data)
1062 defp restrict_replies(query, %{
1063 "reply_filtering_user" => user,
1064 "reply_visibility" => "self"
1067 [activity, object] in query,
1070 "?->>'inReplyTo' is null OR ? = ANY(?)",
1078 defp restrict_replies(query, %{
1079 "reply_filtering_user" => user,
1080 "reply_visibility" => "following"
1083 [activity, object] in query,
1086 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
1088 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1089 activity.recipients,
1097 defp restrict_replies(query, _), do: query
1099 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
1100 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1103 defp restrict_reblogs(query, _), do: query
1105 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
1107 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
1108 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
1111 from([activity] in query,
1112 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1113 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
1116 unless opts["skip_preload"] do
1117 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1123 defp restrict_muted(query, _), do: query
1125 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
1126 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
1127 domain_blocks = user.domain_blocks || []
1129 following_ap_ids = User.get_friends_ap_ids(user)
1132 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1135 [activity, object: o] in query,
1136 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1137 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
1140 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1147 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1155 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1164 defp restrict_blocked(query, _), do: query
1166 defp restrict_unlisted(query) do
1171 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1173 ^[Constants.as_public()]
1178 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
1179 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
1180 # and `restrict_muted/2`
1182 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
1183 when pinned in [true, "true", "1"] do
1184 from(activity in query, where: activity.id in ^ids)
1187 defp restrict_pinned(query, _), do: query
1189 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1190 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1196 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1204 defp restrict_muted_reblogs(query, _), do: query
1206 defp restrict_instance(query, %{"instance" => instance}) do
1211 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1215 from(activity in query, where: activity.actor in ^users)
1218 defp restrict_instance(query, _), do: query
1220 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1222 defp exclude_poll_votes(query, _) do
1223 if has_named_binding?(query, :object) do
1224 from([activity, object: o] in query,
1225 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1232 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1233 from(activity in query, where: activity.id != ^id)
1236 defp exclude_id(query, _), do: query
1238 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1240 defp maybe_preload_objects(query, _) do
1242 |> Activity.with_preloaded_object()
1245 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1247 defp maybe_preload_bookmarks(query, opts) do
1249 |> Activity.with_preloaded_bookmark(opts["user"])
1252 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1254 |> Activity.with_preloaded_report_notes()
1257 defp maybe_preload_report_notes(query, _), do: query
1259 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1261 defp maybe_set_thread_muted_field(query, opts) do
1263 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1266 defp maybe_order(query, %{order: :desc}) do
1268 |> order_by(desc: :id)
1271 defp maybe_order(query, %{order: :asc}) do
1273 |> order_by(asc: :id)
1276 defp maybe_order(query, _), do: query
1278 defp fetch_activities_query_ap_ids_ops(opts) do
1279 source_user = opts["muting_user"]
1280 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1282 ap_id_relationships =
1283 ap_id_relationships ++
1284 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1290 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1292 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1293 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1295 restrict_muted_reblogs_opts =
1296 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1298 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1301 def fetch_activities_query(recipients, opts \\ %{}) do
1302 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1303 fetch_activities_query_ap_ids_ops(opts)
1306 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1310 |> maybe_preload_objects(opts)
1311 |> maybe_preload_bookmarks(opts)
1312 |> maybe_preload_report_notes(opts)
1313 |> maybe_set_thread_muted_field(opts)
1314 |> maybe_order(opts)
1315 |> restrict_recipients(recipients, opts["user"])
1316 |> restrict_replies(opts)
1317 |> restrict_tag(opts)
1318 |> restrict_tag_reject(opts)
1319 |> restrict_tag_all(opts)
1320 |> restrict_since(opts)
1321 |> restrict_local(opts)
1322 |> restrict_actor(opts)
1323 |> restrict_type(opts)
1324 |> restrict_state(opts)
1325 |> restrict_favorited_by(opts)
1326 |> restrict_blocked(restrict_blocked_opts)
1327 |> restrict_muted(restrict_muted_opts)
1328 |> restrict_media(opts)
1329 |> restrict_visibility(opts)
1330 |> restrict_thread_visibility(opts, config)
1331 |> restrict_reblogs(opts)
1332 |> restrict_pinned(opts)
1333 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1334 |> restrict_instance(opts)
1335 |> Activity.restrict_deactivated_users()
1336 |> exclude_poll_votes(opts)
1337 |> exclude_visibility(opts)
1340 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1341 list_memberships = Pleroma.List.memberships(opts["user"])
1343 fetch_activities_query(recipients ++ list_memberships, opts)
1344 |> Pagination.fetch_paginated(opts, pagination)
1346 |> maybe_update_cc(list_memberships, opts["user"])
1350 Fetch favorites activities of user with order by sort adds to favorites
1352 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1353 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1355 |> Activity.Queries.by_actor()
1356 |> Activity.Queries.by_type("Like")
1357 |> Activity.with_joined_object()
1358 |> Object.with_joined_activity()
1359 |> select([_like, object, activity], %{activity | object: object})
1360 |> order_by([like, _, _], desc: like.id)
1361 |> Pagination.fetch_paginated(
1362 Map.merge(params, %{"skip_order" => true}),
1368 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1369 when is_list(list_memberships) and length(list_memberships) > 0 do
1370 Enum.map(activities, fn
1371 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1372 if Enum.any?(bcc, &(&1 in list_memberships)) do
1373 update_in(activity.data["cc"], &[user_ap_id | &1])
1383 defp maybe_update_cc(activities, _, _), do: activities
1385 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1386 from(activity in query,
1388 fragment("? && ?", activity.recipients, ^recipients) or
1389 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1390 ^Constants.as_public() in activity.recipients)
1394 def fetch_activities_bounded(
1396 recipients_with_public,
1398 pagination \\ :keyset
1400 fetch_activities_query([], opts)
1401 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1402 |> Pagination.fetch_paginated(opts, pagination)
1406 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1407 def upload(file, opts \\ []) do
1408 with {:ok, data} <- Upload.store(file, opts) do
1411 Map.put(data, "actor", opts[:actor])
1416 Repo.insert(%Object{data: obj_data})
1420 @spec get_actor_url(any()) :: binary() | nil
1421 defp get_actor_url(url) when is_binary(url), do: url
1422 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1424 defp get_actor_url(url) when is_list(url) do
1430 defp get_actor_url(_url), do: nil
1432 defp object_to_user_data(data) do
1434 data["icon"]["url"] &&
1437 "url" => [%{"href" => data["icon"]["url"]}]
1441 data["image"]["url"] &&
1444 "url" => [%{"href" => data["image"]["url"]}]
1449 |> Map.get("attachment", [])
1450 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1451 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1455 |> Map.get("tag", [])
1457 %{"type" => "Emoji"} -> true
1460 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1461 Map.put(acc, String.trim(name, ":"), url)
1464 locked = data["manuallyApprovesFollowers"] || false
1465 data = Transmogrifier.maybe_fix_user_object(data)
1466 discoverable = data["discoverable"] || false
1467 invisible = data["invisible"] || false
1468 actor_type = data["type"] || "Person"
1471 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1472 data["publicKey"]["publicKeyPem"]
1478 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1479 data["endpoints"]["sharedInbox"]
1486 uri: get_actor_url(data["url"]),
1492 discoverable: discoverable,
1493 invisible: invisible,
1496 follower_address: data["followers"],
1497 following_address: data["following"],
1498 bio: data["summary"],
1499 actor_type: actor_type,
1500 also_known_as: Map.get(data, "alsoKnownAs", []),
1501 public_key: public_key,
1502 inbox: data["inbox"],
1503 shared_inbox: shared_inbox
1506 # nickname can be nil because of virtual actors
1508 if data["preferredUsername"] do
1512 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1515 Map.put(user_data, :nickname, nil)
1521 def fetch_follow_information_for_user(user) do
1522 with {:ok, following_data} <-
1523 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1524 {:ok, hide_follows} <- collection_private(following_data),
1525 {:ok, followers_data} <-
1526 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1527 {:ok, hide_followers} <- collection_private(followers_data) do
1530 hide_follows: hide_follows,
1531 follower_count: normalize_counter(followers_data["totalItems"]),
1532 following_count: normalize_counter(following_data["totalItems"]),
1533 hide_followers: hide_followers
1536 {:error, _} = e -> e
1541 defp normalize_counter(counter) when is_integer(counter), do: counter
1542 defp normalize_counter(_), do: 0
1544 def maybe_update_follow_information(user_data) do
1545 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1546 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1548 {:collections_available,
1549 !!(user_data[:following_address] && user_data[:follower_address])},
1551 fetch_follow_information_for_user(user_data) do
1552 info = Map.merge(user_data[:info] || %{}, info)
1555 |> Map.put(:info, info)
1557 {:user_type_check, false} ->
1560 {:collections_available, false} ->
1563 {:enabled, false} ->
1568 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1575 defp collection_private(%{"first" => %{"type" => type}})
1576 when type in ["CollectionPage", "OrderedCollectionPage"],
1579 defp collection_private(%{"first" => first}) do
1580 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1581 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1584 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1585 {:error, _} = e -> e
1590 defp collection_private(_data), do: {:ok, true}
1592 def user_data_from_user_object(data) do
1593 with {:ok, data} <- MRF.filter(data),
1594 {:ok, data} <- object_to_user_data(data) do
1601 def fetch_and_prepare_user_from_ap_id(ap_id) do
1602 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1603 {:ok, data} <- user_data_from_user_object(data),
1604 data <- maybe_update_follow_information(data) do
1607 {:error, "Object has been deleted"} = e ->
1608 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1612 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1617 def make_user_from_ap_id(ap_id) do
1618 user = User.get_cached_by_ap_id(ap_id)
1620 if user && !User.ap_enabled?(user) do
1621 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1623 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1626 |> User.remote_user_changeset(data)
1627 |> User.update_and_set_cache()
1630 |> User.remote_user_changeset()
1640 def make_user_from_nickname(nickname) do
1641 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1642 make_user_from_ap_id(ap_id)
1644 _e -> {:error, "No AP id in WebFinger"}
1648 # filter out broken threads
1649 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1650 entire_thread_visible_for_user?(activity, user)
1653 # do post-processing on a specific activity
1654 def contain_activity(%Activity{} = activity, %User{} = user) do
1655 contain_broken_threads(activity, user)
1658 def fetch_direct_messages_query do
1660 |> restrict_type(%{"type" => "Create"})
1661 |> restrict_visibility(%{visibility: "direct"})
1662 |> order_by([activity], asc: activity.id)