1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 # For Announce activities, we filter the recipients based on following status for any actors
36 # that match actual users. See issue #164 for more information about why this is necessary.
37 defp get_recipients(%{"type" => "Announce"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = User.get_cached_by_ap_id(data["actor"])
44 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
45 case User.get_cached_by_ap_id(recipient) do
47 user -> User.following?(user, actor)
54 defp get_recipients(%{"type" => "Create"} = data) do
55 to = Map.get(data, "to", [])
56 cc = Map.get(data, "cc", [])
57 bcc = Map.get(data, "bcc", [])
58 actor = Map.get(data, "actor", [])
59 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
63 defp get_recipients(data) do
64 to = Map.get(data, "to", [])
65 cc = Map.get(data, "cc", [])
66 bcc = Map.get(data, "bcc", [])
67 recipients = Enum.concat([to, cc, bcc])
71 defp check_actor_is_active(actor) do
72 if not is_nil(actor) do
73 with user <- User.get_cached_by_ap_id(actor),
74 false <- user.deactivated do
84 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
85 limit = Config.get([:instance, :remote_limit])
86 String.length(content) <= limit
89 defp check_remote_limit(_), do: true
91 def increase_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
95 def decrease_note_count_if_public(actor, object) do
96 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
99 def increase_replies_count_if_reply(%{
100 "object" => %{"inReplyTo" => reply_ap_id} = object,
103 if is_public?(object) do
104 Object.increase_replies_count(reply_ap_id)
108 def increase_replies_count_if_reply(_create_data), do: :noop
110 def decrease_replies_count_if_reply(%Object{
111 data: %{"inReplyTo" => reply_ap_id} = object
113 if is_public?(object) do
114 Object.decrease_replies_count(reply_ap_id)
118 def decrease_replies_count_if_reply(_object), do: :noop
120 def increase_poll_votes_if_vote(%{
121 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
124 Object.increase_vote_count(reply_ap_id, name)
127 def increase_poll_votes_if_vote(_create_data), do: :noop
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
134 Repo.insert(%Activity{
137 recipients: recipients,
138 actor: object["actor"]
140 {:ok, activity, meta}
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
160 recipients: recipients
163 |> maybe_create_activity_expiration()
165 # Splice in the child object if we have one.
167 if not is_nil(object) do
168 Map.put(activity, :object, object)
173 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
175 Notification.create_notifications(activity)
177 conversation = create_or_bump_conversation(activity, map["actor"])
178 participations = get_participations(conversation)
180 stream_out_participations(participations)
183 %Activity{} = activity ->
186 {:fake, true, map, recipients} ->
187 activity = %Activity{
191 recipients: recipients,
195 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
203 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
204 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
209 defp maybe_create_activity_expiration(result), do: result
211 defp create_or_bump_conversation(activity, actor) do
212 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
213 %User{} = user <- User.get_cached_by_ap_id(actor),
214 Participation.mark_as_read(user, conversation) do
219 defp get_participations({:ok, conversation}) do
221 |> Repo.preload(:participations, force: true)
222 |> Map.get(:participations)
225 defp get_participations(_), do: []
227 def stream_out_participations(participations) do
230 |> Repo.preload(:user)
232 Streamer.stream("participation", participations)
235 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
236 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
237 conversation = Repo.preload(conversation, :participations),
239 fetch_latest_activity_id_for_context(conversation.ap_id, %{
241 "blocking_user" => user
243 if last_activity_id do
244 stream_out_participations(conversation.participations)
249 def stream_out_participations(_, _), do: :noop
251 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
252 when data_type in ["Create", "Announce", "Delete"] do
254 |> Topics.get_activity_topics()
255 |> Streamer.stream(activity)
258 def stream_out(_activity) do
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
278 %{to: to, actor: actor, published: published, context: context, object: object},
281 {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 _ <- increase_poll_votes_if_vote(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
313 {:ok, activity} <- insert(listen_data, local),
314 :ok <- maybe_federate(activity) do
319 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
320 def accept(params) do
321 accept_or_reject("Accept", params)
324 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
325 def reject(params) do
326 accept_or_reject("Reject", params)
329 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
330 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
331 local = Map.get(params, :local, true)
332 activity_id = Map.get(params, :activity_id, nil)
335 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
336 |> Utils.maybe_put("id", activity_id),
337 {:ok, activity} <- insert(data, local),
338 :ok <- maybe_federate(activity) do
343 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
344 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
345 local = !(params[:local] == false)
346 activity_id = params[:activity_id]
355 data <- Utils.maybe_put(data, "id", activity_id),
356 {:ok, activity} <- insert(data, local),
357 :ok <- maybe_federate(activity) do
362 @spec react_with_emoji(User.t(), Object.t(), String.t(), keyword()) ::
363 {:ok, Activity.t(), Object.t()} | {:error, any()}
364 def react_with_emoji(user, object, emoji, options \\ []) do
365 with {:ok, result} <-
366 Repo.transaction(fn -> do_react_with_emoji(user, object, emoji, options) end) do
371 defp do_react_with_emoji(user, object, emoji, options) do
372 with local <- Keyword.get(options, :local, true),
373 activity_id <- Keyword.get(options, :activity_id, nil),
374 true <- Pleroma.Emoji.is_unicode_emoji?(emoji),
375 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
376 {:ok, activity} <- insert(reaction_data, local),
377 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
378 :ok <- maybe_federate(activity) do
379 {:ok, activity, object}
381 false -> {:error, false}
382 {:error, error} -> Repo.rollback(error)
386 @spec unreact_with_emoji(User.t(), String.t(), keyword()) ::
387 {:ok, Activity.t(), Object.t()} | {:error, any()}
388 def unreact_with_emoji(user, reaction_id, options \\ []) do
389 with {:ok, result} <-
390 Repo.transaction(fn -> do_unreact_with_emoji(user, reaction_id, options) end) do
395 defp do_unreact_with_emoji(user, reaction_id, options) do
396 with local <- Keyword.get(options, :local, true),
397 activity_id <- Keyword.get(options, :activity_id, nil),
398 user_ap_id <- user.ap_id,
399 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
400 object <- Object.normalize(reaction_activity),
401 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
402 {:ok, activity} <- insert(unreact_data, local),
403 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
404 :ok <- maybe_federate(activity) do
405 {:ok, activity, object}
407 {:error, error} -> Repo.rollback(error)
411 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
412 @spec like(User.t(), Object.t(), String.t() | nil, boolean()) ::
413 {:ok, Activity.t(), Object.t()} | {:error, any()}
414 def like(user, object, activity_id \\ nil, local \\ true) do
415 with {:ok, result} <- Repo.transaction(fn -> do_like(user, object, activity_id, local) end) do
421 %User{ap_id: ap_id} = user,
422 %Object{data: %{"id" => _}} = object,
426 with nil <- get_existing_like(ap_id, object),
427 like_data <- make_like_data(user, object, activity_id),
428 {:ok, activity} <- insert(like_data, local),
429 {:ok, object} <- add_like_to_object(activity, object),
430 :ok <- maybe_federate(activity) do
431 {:ok, activity, object}
433 %Activity{} = activity ->
434 {:ok, activity, object}
441 @spec unlike(User.t(), Object.t(), String.t() | nil, boolean()) ::
442 {:ok, Activity.t(), Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
443 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
444 with {:ok, result} <-
445 Repo.transaction(fn -> do_unlike(actor, object, activity_id, local) end) do
450 defp do_unlike(actor, object, activity_id, local) do
451 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
452 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
453 {:ok, unlike_activity} <- insert(unlike_data, local),
454 {:ok, _activity} <- Repo.delete(like_activity),
455 {:ok, object} <- remove_like_from_object(like_activity, object),
456 :ok <- maybe_federate(unlike_activity) do
457 {:ok, unlike_activity, like_activity, object}
460 {:error, error} -> Repo.rollback(error)
464 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
465 {:ok, Activity.t(), Object.t()} | {:error, any()}
467 %User{ap_id: _} = user,
468 %Object{data: %{"id" => _}} = object,
473 with {:ok, result} <-
474 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
479 defp do_announce(user, object, activity_id, local, public) do
480 with true <- is_announceable?(object, user, public),
481 announce_data <- make_announce_data(user, object, activity_id, public),
482 {:ok, activity} <- insert(announce_data, local),
483 {:ok, object} <- add_announce_to_object(activity, object),
484 :ok <- maybe_federate(activity) do
485 {:ok, activity, object}
487 false -> {:error, false}
488 {:error, error} -> Repo.rollback(error)
492 @spec unannounce(User.t(), Object.t(), String.t() | nil, boolean()) ::
493 {:ok, Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
500 with {:ok, result} <-
501 Repo.transaction(fn -> do_unannounce(actor, object, activity_id, local) end) do
506 defp do_unannounce(actor, object, activity_id, local) do
507 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
508 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
509 {:ok, unannounce_activity} <- insert(unannounce_data, local),
510 :ok <- maybe_federate(unannounce_activity),
511 {:ok, _activity} <- Repo.delete(announce_activity),
512 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
513 {:ok, unannounce_activity, object}
516 {:error, error} -> Repo.rollback(error)
520 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
521 {:ok, Activity.t()} | {:error, any()}
522 def follow(follower, followed, activity_id \\ nil, local \\ true) do
523 with {:ok, result} <-
524 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
529 defp do_follow(follower, followed, activity_id, local) do
530 with data <- make_follow_data(follower, followed, activity_id),
531 {:ok, activity} <- insert(data, local),
532 :ok <- maybe_federate(activity) do
535 {:error, error} -> Repo.rollback(error)
539 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
540 {:ok, Activity.t()} | nil | {:error, any()}
541 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
542 with {:ok, result} <-
543 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
548 defp do_unfollow(follower, followed, activity_id, local) do
549 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
550 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
551 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
552 {:ok, activity} <- insert(unfollow_data, local),
553 :ok <- maybe_federate(activity) do
557 {:error, error} -> Repo.rollback(error)
561 @spec delete(User.t() | Object.t(), keyword()) :: {:ok, User.t() | Object.t()} | {:error, any()}
562 def delete(entity, options \\ []) do
563 with {:ok, result} <- Repo.transaction(fn -> do_delete(entity, options) end) do
568 defp do_delete(%User{ap_id: ap_id, follower_address: follower_address} = user, _) do
570 "to" => [follower_address],
573 "object" => %{"type" => "Person", "id" => ap_id}
575 {:ok, activity} <- insert(data, true, true, true),
576 :ok <- maybe_federate(activity) do
581 defp do_delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options) do
582 local = Keyword.get(options, :local, true)
583 activity_id = Keyword.get(options, :activity_id, nil)
584 actor = Keyword.get(options, :actor, actor)
586 user = User.get_cached_by_ap_id(actor)
587 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
589 with create_activity <- Activity.get_create_by_object_ap_id(id),
596 "deleted_activity_id" => create_activity && create_activity.id
598 |> maybe_put("id", activity_id),
599 {:ok, activity} <- insert(data, local, false),
600 {:ok, object, _create_activity} <- Object.delete(object),
601 stream_out_participations(object, user),
602 _ <- decrease_replies_count_if_reply(object),
603 {:ok, _actor} <- decrease_note_count_if_public(user, object),
604 :ok <- maybe_federate(activity) do
612 defp do_delete(%Object{data: %{"type" => "Tombstone", "id" => ap_id}}, _) do
615 |> Activity.Queries.by_object_id()
616 |> Activity.Queries.by_type("Delete")
622 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
623 {:ok, Activity.t()} | {:error, any()}
624 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
625 with {:ok, result} <-
626 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
631 defp do_block(blocker, blocked, activity_id, local) do
632 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
633 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
635 if unfollow_blocked do
636 follow_activity = fetch_latest_follow(blocker, blocked)
637 if follow_activity, do: unfollow(blocker, blocked, nil, local)
640 with true <- outgoing_blocks,
641 block_data <- make_block_data(blocker, blocked, activity_id),
642 {:ok, activity} <- insert(block_data, local),
643 :ok <- maybe_federate(activity) do
646 {:error, error} -> Repo.rollback(error)
650 @spec unblock(User.t(), User.t(), String.t() | nil, boolean()) ::
651 {:ok, Activity.t()} | {:error, any()} | nil
652 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
653 with {:ok, result} <-
654 Repo.transaction(fn -> do_unblock(blocker, blocked, activity_id, local) end) do
659 defp do_unblock(blocker, blocked, activity_id, local) do
660 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
661 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
662 {:ok, activity} <- insert(unblock_data, local),
663 :ok <- maybe_federate(activity) do
667 {:error, error} -> Repo.rollback(error)
671 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
681 # only accept false as false value
682 local = !(params[:local] == false)
683 forward = !(params[:forward] == false)
685 additional = params[:additional] || %{}
689 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
691 Map.merge(additional, %{"to" => [], "cc" => []})
694 with flag_data <- make_flag_data(params, additional),
695 {:ok, activity} <- insert(flag_data, local),
696 {:ok, stripped_activity} <- strip_report_status_data(activity),
697 :ok <- maybe_federate(stripped_activity) do
698 User.all_superusers()
699 |> Enum.filter(fn user -> not is_nil(user.email) end)
700 |> Enum.each(fn superuser ->
702 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
703 |> Pleroma.Emails.Mailer.deliver_async()
710 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
711 def move(%User{} = origin, %User{} = target, local \\ true) do
714 "actor" => origin.ap_id,
715 "object" => origin.ap_id,
716 "target" => target.ap_id
719 with true <- origin.ap_id in target.also_known_as,
720 {:ok, activity} <- insert(params, local) do
721 maybe_federate(activity)
723 BackgroundWorker.enqueue("move_following", %{
724 "origin_id" => origin.id,
725 "target_id" => target.id
730 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
735 defp fetch_activities_for_context_query(context, opts) do
736 public = [Constants.as_public()]
740 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
743 from(activity in Activity)
744 |> maybe_preload_objects(opts)
745 |> maybe_preload_bookmarks(opts)
746 |> maybe_set_thread_muted_field(opts)
747 |> restrict_blocked(opts)
748 |> restrict_recipients(recipients, opts["user"])
752 "?->>'type' = ? and ?->>'context' = ?",
759 |> exclude_poll_votes(opts)
761 |> order_by([activity], desc: activity.id)
764 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
765 def fetch_activities_for_context(context, opts \\ %{}) do
767 |> fetch_activities_for_context_query(opts)
771 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
772 FlakeId.Ecto.CompatType.t() | nil
773 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
775 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
781 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
782 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
783 opts = Map.drop(opts, ["user"])
785 [Constants.as_public()]
786 |> fetch_activities_query(opts)
787 |> restrict_unlisted()
788 |> Pagination.fetch_paginated(opts, pagination)
791 @valid_visibilities ~w[direct unlisted public private]
793 defp restrict_visibility(query, %{visibility: visibility})
794 when is_list(visibility) do
795 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
801 "activity_visibility(?, ?, ?) = ANY (?)",
811 Logger.error("Could not restrict visibility to #{visibility}")
815 defp restrict_visibility(query, %{visibility: visibility})
816 when visibility in @valid_visibilities do
820 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
824 defp restrict_visibility(_query, %{visibility: visibility})
825 when visibility not in @valid_visibilities do
826 Logger.error("Could not restrict visibility to #{visibility}")
829 defp restrict_visibility(query, _visibility), do: query
831 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
832 when is_list(visibility) do
833 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
838 "activity_visibility(?, ?, ?) = ANY (?)",
846 Logger.error("Could not exclude visibility to #{visibility}")
851 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
852 when visibility in @valid_visibilities do
857 "activity_visibility(?, ?, ?) = ?",
866 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
867 when visibility not in @valid_visibilities do
868 Logger.error("Could not exclude visibility to #{visibility}")
872 defp exclude_visibility(query, _visibility), do: query
874 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
877 defp restrict_thread_visibility(
879 %{"user" => %User{skip_thread_containment: true}},
884 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
887 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
891 defp restrict_thread_visibility(query, _, _), do: query
893 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
896 |> Map.put("user", reading_user)
897 |> Map.put("actor_id", user.ap_id)
900 user_activities_recipients(%{
901 "godmode" => params["godmode"],
902 "reading_user" => reading_user
905 fetch_activities(recipients, params)
909 def fetch_user_activities(user, reading_user, params \\ %{}) do
912 |> Map.put("type", ["Create", "Announce"])
913 |> Map.put("user", reading_user)
914 |> Map.put("actor_id", user.ap_id)
915 |> Map.put("pinned_activity_ids", user.pinned_activities)
918 if User.blocks?(reading_user, user) do
922 |> Map.put("blocking_user", reading_user)
923 |> Map.put("muting_user", reading_user)
927 user_activities_recipients(%{
928 "godmode" => params["godmode"],
929 "reading_user" => reading_user
932 fetch_activities(recipients, params)
936 def fetch_statuses(reading_user, params) do
939 |> Map.put("type", ["Create", "Announce"])
942 user_activities_recipients(%{
943 "godmode" => params["godmode"],
944 "reading_user" => reading_user
947 fetch_activities(recipients, params, :offset)
951 defp user_activities_recipients(%{"godmode" => true}) do
955 defp user_activities_recipients(%{"reading_user" => reading_user}) do
957 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
959 [Constants.as_public()]
963 defp restrict_since(query, %{"since_id" => ""}), do: query
965 defp restrict_since(query, %{"since_id" => since_id}) do
966 from(activity in query, where: activity.id > ^since_id)
969 defp restrict_since(query, _), do: query
971 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
972 raise "Can't use the child object without preloading!"
975 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
976 when is_list(tag_reject) and tag_reject != [] do
978 [_activity, object] in query,
979 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
983 defp restrict_tag_reject(query, _), do: query
985 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
986 raise "Can't use the child object without preloading!"
989 defp restrict_tag_all(query, %{"tag_all" => tag_all})
990 when is_list(tag_all) and tag_all != [] do
992 [_activity, object] in query,
993 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
997 defp restrict_tag_all(query, _), do: query
999 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
1000 raise "Can't use the child object without preloading!"
1003 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
1005 [_activity, object] in query,
1006 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
1010 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
1012 [_activity, object] in query,
1013 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
1017 defp restrict_tag(query, _), do: query
1019 defp restrict_recipients(query, [], _user), do: query
1021 defp restrict_recipients(query, recipients, nil) do
1022 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
1025 defp restrict_recipients(query, recipients, user) do
1028 where: fragment("? && ?", ^recipients, activity.recipients),
1029 or_where: activity.actor == ^user.ap_id
1033 defp restrict_local(query, %{"local_only" => true}) do
1034 from(activity in query, where: activity.local == true)
1037 defp restrict_local(query, _), do: query
1039 defp restrict_actor(query, %{"actor_id" => actor_id}) do
1040 from(activity in query, where: activity.actor == ^actor_id)
1043 defp restrict_actor(query, _), do: query
1045 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
1046 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
1049 defp restrict_type(query, %{"type" => type}) do
1050 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
1053 defp restrict_type(query, _), do: query
1055 defp restrict_state(query, %{"state" => state}) do
1056 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
1059 defp restrict_state(query, _), do: query
1061 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
1063 [_activity, object] in query,
1064 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
1068 defp restrict_favorited_by(query, _), do: query
1070 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
1071 raise "Can't use the child object without preloading!"
1074 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
1076 [_activity, object] in query,
1077 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
1081 defp restrict_media(query, _), do: query
1083 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
1085 [_activity, object] in query,
1086 where: fragment("?->>'inReplyTo' is null", object.data)
1090 defp restrict_replies(query, _), do: query
1092 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
1093 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1096 defp restrict_reblogs(query, _), do: query
1098 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
1100 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
1101 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
1104 from([activity] in query,
1105 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1106 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
1109 unless opts["skip_preload"] do
1110 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1116 defp restrict_muted(query, _), do: query
1118 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
1119 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
1120 domain_blocks = user.domain_blocks || []
1122 following_ap_ids = User.get_friends_ap_ids(user)
1125 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1128 [activity, object: o] in query,
1129 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1130 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
1133 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1140 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1148 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1157 defp restrict_blocked(query, _), do: query
1159 defp restrict_unlisted(query) do
1164 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1166 ^[Constants.as_public()]
1171 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
1172 from(activity in query, where: activity.id in ^ids)
1175 defp restrict_pinned(query, _), do: query
1177 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1178 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1184 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1192 defp restrict_muted_reblogs(query, _), do: query
1194 defp restrict_instance(query, %{"instance" => instance}) do
1199 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1203 from(activity in query, where: activity.actor in ^users)
1206 defp restrict_instance(query, _), do: query
1208 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1210 defp exclude_poll_votes(query, _) do
1211 if has_named_binding?(query, :object) do
1212 from([activity, object: o] in query,
1213 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1220 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1221 from(activity in query, where: activity.id != ^id)
1224 defp exclude_id(query, _), do: query
1226 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1228 defp maybe_preload_objects(query, _) do
1230 |> Activity.with_preloaded_object()
1233 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1235 defp maybe_preload_bookmarks(query, opts) do
1237 |> Activity.with_preloaded_bookmark(opts["user"])
1240 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1242 |> Activity.with_preloaded_report_notes()
1245 defp maybe_preload_report_notes(query, _), do: query
1247 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1249 defp maybe_set_thread_muted_field(query, opts) do
1251 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1254 defp maybe_order(query, %{order: :desc}) do
1256 |> order_by(desc: :id)
1259 defp maybe_order(query, %{order: :asc}) do
1261 |> order_by(asc: :id)
1264 defp maybe_order(query, _), do: query
1266 defp fetch_activities_query_ap_ids_ops(opts) do
1267 source_user = opts["muting_user"]
1268 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1270 ap_id_relationships =
1271 ap_id_relationships ++
1272 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1278 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1280 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1281 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1283 restrict_muted_reblogs_opts =
1284 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1286 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1289 def fetch_activities_query(recipients, opts \\ %{}) do
1290 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1291 fetch_activities_query_ap_ids_ops(opts)
1294 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1298 |> maybe_preload_objects(opts)
1299 |> maybe_preload_bookmarks(opts)
1300 |> maybe_preload_report_notes(opts)
1301 |> maybe_set_thread_muted_field(opts)
1302 |> maybe_order(opts)
1303 |> restrict_recipients(recipients, opts["user"])
1304 |> restrict_tag(opts)
1305 |> restrict_tag_reject(opts)
1306 |> restrict_tag_all(opts)
1307 |> restrict_since(opts)
1308 |> restrict_local(opts)
1309 |> restrict_actor(opts)
1310 |> restrict_type(opts)
1311 |> restrict_state(opts)
1312 |> restrict_favorited_by(opts)
1313 |> restrict_blocked(restrict_blocked_opts)
1314 |> restrict_muted(restrict_muted_opts)
1315 |> restrict_media(opts)
1316 |> restrict_visibility(opts)
1317 |> restrict_thread_visibility(opts, config)
1318 |> restrict_replies(opts)
1319 |> restrict_reblogs(opts)
1320 |> restrict_pinned(opts)
1321 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1322 |> restrict_instance(opts)
1323 |> Activity.restrict_deactivated_users()
1324 |> exclude_poll_votes(opts)
1325 |> exclude_visibility(opts)
1328 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1329 list_memberships = Pleroma.List.memberships(opts["user"])
1331 fetch_activities_query(recipients ++ list_memberships, opts)
1332 |> Pagination.fetch_paginated(opts, pagination)
1334 |> maybe_update_cc(list_memberships, opts["user"])
1338 Fetch favorites activities of user with order by sort adds to favorites
1340 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1341 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1343 |> Activity.Queries.by_actor()
1344 |> Activity.Queries.by_type("Like")
1345 |> Activity.with_joined_object()
1346 |> Object.with_joined_activity()
1347 |> select([_like, object, activity], %{activity | object: object})
1348 |> order_by([like, _, _], desc: like.id)
1349 |> Pagination.fetch_paginated(
1350 Map.merge(params, %{"skip_order" => true}),
1356 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1357 when is_list(list_memberships) and length(list_memberships) > 0 do
1358 Enum.map(activities, fn
1359 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1360 if Enum.any?(bcc, &(&1 in list_memberships)) do
1361 update_in(activity.data["cc"], &[user_ap_id | &1])
1371 defp maybe_update_cc(activities, _, _), do: activities
1373 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1374 from(activity in query,
1376 fragment("? && ?", activity.recipients, ^recipients) or
1377 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1378 ^Constants.as_public() in activity.recipients)
1382 def fetch_activities_bounded(
1384 recipients_with_public,
1386 pagination \\ :keyset
1388 fetch_activities_query([], opts)
1389 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1390 |> Pagination.fetch_paginated(opts, pagination)
1394 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1395 def upload(file, opts \\ []) do
1396 with {:ok, data} <- Upload.store(file, opts) do
1399 Map.put(data, "actor", opts[:actor])
1404 Repo.insert(%Object{data: obj_data})
1408 @spec get_actor_url(any()) :: binary() | nil
1409 defp get_actor_url(url) when is_binary(url), do: url
1410 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1412 defp get_actor_url(url) when is_list(url) do
1418 defp get_actor_url(_url), do: nil
1420 defp object_to_user_data(data) do
1422 data["icon"]["url"] &&
1425 "url" => [%{"href" => data["icon"]["url"]}]
1429 data["image"]["url"] &&
1432 "url" => [%{"href" => data["image"]["url"]}]
1437 |> Map.get("attachment", [])
1438 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1439 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1441 locked = data["manuallyApprovesFollowers"] || false
1442 data = Transmogrifier.maybe_fix_user_object(data)
1443 discoverable = data["discoverable"] || false
1444 invisible = data["invisible"] || false
1445 actor_type = data["type"] || "Person"
1449 uri: get_actor_url(data["url"]),
1455 discoverable: discoverable,
1456 invisible: invisible,
1459 follower_address: data["followers"],
1460 following_address: data["following"],
1461 bio: data["summary"],
1462 actor_type: actor_type,
1463 also_known_as: Map.get(data, "alsoKnownAs", [])
1466 # nickname can be nil because of virtual actors
1468 if data["preferredUsername"] do
1472 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1475 Map.put(user_data, :nickname, nil)
1481 def fetch_follow_information_for_user(user) do
1482 with {:ok, following_data} <-
1483 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1484 {:ok, hide_follows} <- collection_private(following_data),
1485 {:ok, followers_data} <-
1486 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1487 {:ok, hide_followers} <- collection_private(followers_data) do
1490 hide_follows: hide_follows,
1491 follower_count: normalize_counter(followers_data["totalItems"]),
1492 following_count: normalize_counter(following_data["totalItems"]),
1493 hide_followers: hide_followers
1496 {:error, _} = e -> e
1501 defp normalize_counter(counter) when is_integer(counter), do: counter
1502 defp normalize_counter(_), do: 0
1504 defp maybe_update_follow_information(data) do
1505 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1506 {:ok, info} <- fetch_follow_information_for_user(data) do
1507 info = Map.merge(data[:info] || %{}, info)
1508 Map.put(data, :info, info)
1510 {:enabled, false} ->
1515 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1522 defp collection_private(%{"first" => %{"type" => type}})
1523 when type in ["CollectionPage", "OrderedCollectionPage"],
1526 defp collection_private(%{"first" => first}) do
1527 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1528 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1531 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1532 {:error, _} = e -> e
1537 defp collection_private(_data), do: {:ok, true}
1539 def user_data_from_user_object(data) do
1540 with {:ok, data} <- MRF.filter(data),
1541 {:ok, data} <- object_to_user_data(data) do
1548 def fetch_and_prepare_user_from_ap_id(ap_id) do
1549 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1550 {:ok, data} <- user_data_from_user_object(data),
1551 data <- maybe_update_follow_information(data) do
1554 {:error, "Object has been deleted"} = e ->
1555 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1559 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1564 def make_user_from_ap_id(ap_id) do
1565 if _user = User.get_cached_by_ap_id(ap_id) do
1566 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1568 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1569 User.insert_or_update_user(data)
1576 def make_user_from_nickname(nickname) do
1577 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1578 make_user_from_ap_id(ap_id)
1580 _e -> {:error, "No AP id in WebFinger"}
1584 # filter out broken threads
1585 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1586 entire_thread_visible_for_user?(activity, user)
1589 # do post-processing on a specific activity
1590 def contain_activity(%Activity{} = activity, %User{} = user) do
1591 contain_broken_threads(activity, user)
1594 def fetch_direct_messages_query do
1596 |> restrict_type(%{"type" => "Create"})
1597 |> restrict_visibility(%{visibility: "direct"})
1598 |> order_by([activity], asc: activity.id)