1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
14 alias Pleroma.Object.Containment
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.Pagination
20 alias Pleroma.Web.ActivityPub.MRF
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.Utils
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
32 require Pleroma.Constants
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
46 user -> User.following?(user, actor)
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
88 defp check_remote_limit(_), do: true
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
107 def increase_replies_count_if_reply(_create_data), do: :noop
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
117 def decrease_replies_count_if_reply(_object), do: :noop
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
123 Object.increase_vote_count(reply_ap_id, name)
126 def increase_poll_votes_if_vote(_create_data), do: :noop
128 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
129 # TODO rewrite in with style
130 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
131 def persist(object, meta) do
132 local = Keyword.fetch!(meta, :local)
133 {recipients, _, _} = get_recipients(object)
136 Repo.insert(%Activity{
139 recipients: recipients,
140 actor: object["actor"]
143 {:ok, activity, meta}
146 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
147 with nil <- Activity.normalize(map),
148 map <- lazy_put_activity_defaults(map, fake),
149 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
150 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
151 {:ok, map} <- MRF.filter(map),
152 {recipients, _, _} = get_recipients(map),
153 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
154 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
155 {:ok, map, object} <- insert_full_object(map) do
157 Repo.insert(%Activity{
161 recipients: recipients
164 # Splice in the child object if we have one.
166 if not is_nil(object) do
167 Map.put(activity, :object, object)
172 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
174 Notification.create_notifications(activity)
176 conversation = create_or_bump_conversation(activity, map["actor"])
177 participations = get_participations(conversation)
179 stream_out_participations(participations)
182 %Activity{} = activity ->
185 {:fake, true, map, recipients} ->
186 activity = %Activity{
190 recipients: recipients,
194 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
202 defp create_or_bump_conversation(activity, actor) do
203 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
204 %User{} = user <- User.get_cached_by_ap_id(actor),
205 Participation.mark_as_read(user, conversation) do
210 defp get_participations({:ok, conversation}) do
212 |> Repo.preload(:participations, force: true)
213 |> Map.get(:participations)
216 defp get_participations(_), do: []
218 def stream_out_participations(participations) do
221 |> Repo.preload(:user)
223 Streamer.stream("participation", participations)
226 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
227 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
228 conversation = Repo.preload(conversation, :participations),
230 fetch_latest_activity_id_for_context(conversation.ap_id, %{
232 "blocking_user" => user
234 if last_activity_id do
235 stream_out_participations(conversation.participations)
240 def stream_out_participations(_, _), do: :noop
242 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
243 when data_type in ["Create", "Announce", "Delete"] do
245 |> Topics.get_activity_topics()
246 |> Streamer.stream(activity)
249 def stream_out(_activity) do
253 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
254 def create(params, fake \\ false) do
255 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
260 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
261 additional = params[:additional] || %{}
262 # only accept false as false value
263 local = !(params[:local] == false)
264 published = params[:published]
265 quick_insert? = Config.get([:env]) == :benchmark
269 %{to: to, actor: actor, published: published, context: context, object: object},
272 {:ok, activity} <- insert(create_data, local, fake),
273 {:fake, false, activity} <- {:fake, fake, activity},
274 _ <- increase_replies_count_if_reply(create_data),
275 _ <- increase_poll_votes_if_vote(create_data),
276 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
277 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
278 :ok <- maybe_federate(activity) do
281 {:quick_insert, true, activity} ->
284 {:fake, true, activity} ->
288 Repo.rollback(message)
292 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
293 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
294 additional = params[:additional] || %{}
295 # only accept false as false value
296 local = !(params[:local] == false)
297 published = params[:published]
301 %{to: to, actor: actor, published: published, context: context, object: object},
304 {:ok, activity} <- insert(listen_data, local),
305 :ok <- maybe_federate(activity) do
310 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
311 def accept(params) do
312 accept_or_reject("Accept", params)
315 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
316 def reject(params) do
317 accept_or_reject("Reject", params)
320 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
321 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
322 local = Map.get(params, :local, true)
323 activity_id = Map.get(params, :activity_id, nil)
326 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
327 |> Utils.maybe_put("id", activity_id),
328 {:ok, activity} <- insert(data, local),
329 :ok <- maybe_federate(activity) do
334 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
335 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
336 local = !(params[:local] == false)
337 activity_id = params[:activity_id]
346 data <- Utils.maybe_put(data, "id", activity_id),
347 {:ok, activity} <- insert(data, local),
348 :ok <- maybe_federate(activity) do
353 @spec react_with_emoji(User.t(), Object.t(), String.t(), keyword()) ::
354 {:ok, Activity.t(), Object.t()} | {:error, any()}
355 def react_with_emoji(user, object, emoji, options \\ []) do
356 with {:ok, result} <-
357 Repo.transaction(fn -> do_react_with_emoji(user, object, emoji, options) end) do
362 defp do_react_with_emoji(user, object, emoji, options) do
363 with local <- Keyword.get(options, :local, true),
364 activity_id <- Keyword.get(options, :activity_id, nil),
365 true <- Pleroma.Emoji.is_unicode_emoji?(emoji),
366 reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
367 {:ok, activity} <- insert(reaction_data, local),
368 {:ok, object} <- add_emoji_reaction_to_object(activity, object),
369 :ok <- maybe_federate(activity) do
370 {:ok, activity, object}
372 false -> {:error, false}
373 {:error, error} -> Repo.rollback(error)
377 @spec unreact_with_emoji(User.t(), String.t(), keyword()) ::
378 {:ok, Activity.t(), Object.t()} | {:error, any()}
379 def unreact_with_emoji(user, reaction_id, options \\ []) do
380 with {:ok, result} <-
381 Repo.transaction(fn -> do_unreact_with_emoji(user, reaction_id, options) end) do
386 defp do_unreact_with_emoji(user, reaction_id, options) do
387 with local <- Keyword.get(options, :local, true),
388 activity_id <- Keyword.get(options, :activity_id, nil),
389 user_ap_id <- user.ap_id,
390 %Activity{actor: ^user_ap_id} = reaction_activity <- Activity.get_by_ap_id(reaction_id),
391 object <- Object.normalize(reaction_activity),
392 unreact_data <- make_undo_data(user, reaction_activity, activity_id),
393 {:ok, activity} <- insert(unreact_data, local),
394 {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
395 :ok <- maybe_federate(activity) do
396 {:ok, activity, object}
398 {:error, error} -> Repo.rollback(error)
402 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
403 @spec like(User.t(), Object.t(), String.t() | nil, boolean()) ::
404 {:ok, Activity.t(), Object.t()} | {:error, any()}
405 def like(user, object, activity_id \\ nil, local \\ true) do
406 with {:ok, result} <- Repo.transaction(fn -> do_like(user, object, activity_id, local) end) do
412 %User{ap_id: ap_id} = user,
413 %Object{data: %{"id" => _}} = object,
417 with nil <- get_existing_like(ap_id, object),
418 like_data <- make_like_data(user, object, activity_id),
419 {:ok, activity} <- insert(like_data, local),
420 {:ok, object} <- add_like_to_object(activity, object),
421 :ok <- maybe_federate(activity) do
422 {:ok, activity, object}
424 %Activity{} = activity ->
425 {:ok, activity, object}
432 @spec unlike(User.t(), Object.t(), String.t() | nil, boolean()) ::
433 {:ok, Activity.t(), Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
434 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
435 with {:ok, result} <-
436 Repo.transaction(fn -> do_unlike(actor, object, activity_id, local) end) do
441 defp do_unlike(actor, object, activity_id, local) do
442 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
443 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
444 {:ok, unlike_activity} <- insert(unlike_data, local),
445 {:ok, _activity} <- Repo.delete(like_activity),
446 {:ok, object} <- remove_like_from_object(like_activity, object),
447 :ok <- maybe_federate(unlike_activity) do
448 {:ok, unlike_activity, like_activity, object}
451 {:error, error} -> Repo.rollback(error)
455 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
456 {:ok, Activity.t(), Object.t()} | {:error, any()}
458 %User{ap_id: _} = user,
459 %Object{data: %{"id" => _}} = object,
464 with {:ok, result} <-
465 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
470 defp do_announce(user, object, activity_id, local, public) do
471 with true <- is_announceable?(object, user, public),
472 announce_data <- make_announce_data(user, object, activity_id, public),
473 {:ok, activity} <- insert(announce_data, local),
474 {:ok, object} <- add_announce_to_object(activity, object),
475 :ok <- maybe_federate(activity) do
476 {:ok, activity, object}
478 false -> {:error, false}
479 {:error, error} -> Repo.rollback(error)
483 @spec unannounce(User.t(), Object.t(), String.t() | nil, boolean()) ::
484 {:ok, Activity.t(), Object.t()} | {:ok, Object.t()} | {:error, any()}
491 with {:ok, result} <-
492 Repo.transaction(fn -> do_unannounce(actor, object, activity_id, local) end) do
497 defp do_unannounce(actor, object, activity_id, local) do
498 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
499 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
500 {:ok, unannounce_activity} <- insert(unannounce_data, local),
501 :ok <- maybe_federate(unannounce_activity),
502 {:ok, _activity} <- Repo.delete(announce_activity),
503 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
504 {:ok, unannounce_activity, object}
507 {:error, error} -> Repo.rollback(error)
511 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
512 {:ok, Activity.t()} | {:error, any()}
513 def follow(follower, followed, activity_id \\ nil, local \\ true) do
514 with {:ok, result} <-
515 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
520 defp do_follow(follower, followed, activity_id, local) do
521 with data <- make_follow_data(follower, followed, activity_id),
522 {:ok, activity} <- insert(data, local),
523 :ok <- maybe_federate(activity),
524 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
527 {:error, error} -> Repo.rollback(error)
531 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
532 {:ok, Activity.t()} | nil | {:error, any()}
533 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
534 with {:ok, result} <-
535 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
540 defp do_unfollow(follower, followed, activity_id, local) do
541 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
542 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
543 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
544 {:ok, activity} <- insert(unfollow_data, local),
545 :ok <- maybe_federate(activity) do
549 {:error, error} -> Repo.rollback(error)
553 @spec delete(User.t() | Object.t(), keyword()) :: {:ok, User.t() | Object.t()} | {:error, any()}
554 def delete(entity, options \\ []) do
555 with {:ok, result} <- Repo.transaction(fn -> do_delete(entity, options) end) do
560 defp do_delete(%User{ap_id: ap_id, follower_address: follower_address} = user, _) do
562 "to" => [follower_address],
565 "object" => %{"type" => "Person", "id" => ap_id}
567 {:ok, activity} <- insert(data, true, true, true),
568 :ok <- maybe_federate(activity) do
573 defp do_delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options) do
574 local = Keyword.get(options, :local, true)
575 activity_id = Keyword.get(options, :activity_id, nil)
576 actor = Keyword.get(options, :actor, actor)
578 user = User.get_cached_by_ap_id(actor)
579 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
581 with create_activity <- Activity.get_create_by_object_ap_id(id),
588 "deleted_activity_id" => create_activity && create_activity.id
590 |> maybe_put("id", activity_id),
591 {:ok, activity} <- insert(data, local, false),
592 {:ok, object, _create_activity} <- Object.delete(object),
593 stream_out_participations(object, user),
594 _ <- decrease_replies_count_if_reply(object),
595 {:ok, _actor} <- decrease_note_count_if_public(user, object),
596 :ok <- maybe_federate(activity) do
604 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
605 {:ok, Activity.t()} | {:error, any()}
606 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
607 with {:ok, result} <-
608 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
613 defp do_block(blocker, blocked, activity_id, local) do
614 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
615 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
617 if unfollow_blocked do
618 follow_activity = fetch_latest_follow(blocker, blocked)
619 if follow_activity, do: unfollow(blocker, blocked, nil, local)
622 with true <- outgoing_blocks,
623 block_data <- make_block_data(blocker, blocked, activity_id),
624 {:ok, activity} <- insert(block_data, local),
625 :ok <- maybe_federate(activity) do
628 {:error, error} -> Repo.rollback(error)
632 @spec unblock(User.t(), User.t(), String.t() | nil, boolean()) ::
633 {:ok, Activity.t()} | {:error, any()} | nil
634 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
635 with {:ok, result} <-
636 Repo.transaction(fn -> do_unblock(blocker, blocked, activity_id, local) end) do
641 defp do_unblock(blocker, blocked, activity_id, local) do
642 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
643 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
644 {:ok, activity} <- insert(unblock_data, local),
645 :ok <- maybe_federate(activity) do
649 {:error, error} -> Repo.rollback(error)
653 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
663 # only accept false as false value
664 local = !(params[:local] == false)
665 forward = !(params[:forward] == false)
667 additional = params[:additional] || %{}
671 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
673 Map.merge(additional, %{"to" => [], "cc" => []})
676 with flag_data <- make_flag_data(params, additional),
677 {:ok, activity} <- insert(flag_data, local),
678 {:ok, stripped_activity} <- strip_report_status_data(activity),
679 :ok <- maybe_federate(stripped_activity) do
680 User.all_superusers()
681 |> Enum.filter(fn user -> not is_nil(user.email) end)
682 |> Enum.each(fn superuser ->
684 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
685 |> Pleroma.Emails.Mailer.deliver_async()
692 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
693 def move(%User{} = origin, %User{} = target, local \\ true) do
696 "actor" => origin.ap_id,
697 "object" => origin.ap_id,
698 "target" => target.ap_id
701 with true <- origin.ap_id in target.also_known_as,
702 {:ok, activity} <- insert(params, local) do
703 maybe_federate(activity)
705 BackgroundWorker.enqueue("move_following", %{
706 "origin_id" => origin.id,
707 "target_id" => target.id
712 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
717 defp fetch_activities_for_context_query(context, opts) do
718 public = [Constants.as_public()]
722 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
725 from(activity in Activity)
726 |> maybe_preload_objects(opts)
727 |> maybe_preload_bookmarks(opts)
728 |> maybe_set_thread_muted_field(opts)
729 |> restrict_blocked(opts)
730 |> restrict_recipients(recipients, opts["user"])
734 "?->>'type' = ? and ?->>'context' = ?",
741 |> exclude_poll_votes(opts)
743 |> order_by([activity], desc: activity.id)
746 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
747 def fetch_activities_for_context(context, opts \\ %{}) do
749 |> fetch_activities_for_context_query(opts)
753 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
754 FlakeId.Ecto.CompatType.t() | nil
755 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
757 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
763 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
764 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
765 opts = Map.drop(opts, ["user"])
767 [Constants.as_public()]
768 |> fetch_activities_query(opts)
769 |> restrict_unlisted()
770 |> Pagination.fetch_paginated(opts, pagination)
773 @valid_visibilities ~w[direct unlisted public private]
775 defp restrict_visibility(query, %{visibility: visibility})
776 when is_list(visibility) do
777 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
783 "activity_visibility(?, ?, ?) = ANY (?)",
793 Logger.error("Could not restrict visibility to #{visibility}")
797 defp restrict_visibility(query, %{visibility: visibility})
798 when visibility in @valid_visibilities do
802 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
806 defp restrict_visibility(_query, %{visibility: visibility})
807 when visibility not in @valid_visibilities do
808 Logger.error("Could not restrict visibility to #{visibility}")
811 defp restrict_visibility(query, _visibility), do: query
813 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
814 when is_list(visibility) do
815 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
820 "activity_visibility(?, ?, ?) = ANY (?)",
828 Logger.error("Could not exclude visibility to #{visibility}")
833 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
834 when visibility in @valid_visibilities do
839 "activity_visibility(?, ?, ?) = ?",
848 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
849 when visibility not in @valid_visibilities do
850 Logger.error("Could not exclude visibility to #{visibility}")
854 defp exclude_visibility(query, _visibility), do: query
856 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
859 defp restrict_thread_visibility(
861 %{"user" => %User{skip_thread_containment: true}},
866 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
869 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
873 defp restrict_thread_visibility(query, _, _), do: query
875 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
878 |> Map.put("user", reading_user)
879 |> Map.put("actor_id", user.ap_id)
882 user_activities_recipients(%{
883 "godmode" => params["godmode"],
884 "reading_user" => reading_user
887 fetch_activities(recipients, params)
891 def fetch_user_activities(user, reading_user, params \\ %{}) do
894 |> Map.put("type", ["Create", "Announce"])
895 |> Map.put("user", reading_user)
896 |> Map.put("actor_id", user.ap_id)
897 |> Map.put("pinned_activity_ids", user.pinned_activities)
900 if User.blocks?(reading_user, user) do
904 |> Map.put("blocking_user", reading_user)
905 |> Map.put("muting_user", reading_user)
909 user_activities_recipients(%{
910 "godmode" => params["godmode"],
911 "reading_user" => reading_user
914 fetch_activities(recipients, params)
918 def fetch_statuses(reading_user, params) do
921 |> Map.put("type", ["Create", "Announce"])
924 user_activities_recipients(%{
925 "godmode" => params["godmode"],
926 "reading_user" => reading_user
929 fetch_activities(recipients, params, :offset)
933 defp user_activities_recipients(%{"godmode" => true}) do
937 defp user_activities_recipients(%{"reading_user" => reading_user}) do
939 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
941 [Constants.as_public()]
945 defp restrict_since(query, %{"since_id" => ""}), do: query
947 defp restrict_since(query, %{"since_id" => since_id}) do
948 from(activity in query, where: activity.id > ^since_id)
951 defp restrict_since(query, _), do: query
953 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
954 raise "Can't use the child object without preloading!"
957 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
958 when is_list(tag_reject) and tag_reject != [] do
960 [_activity, object] in query,
961 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
965 defp restrict_tag_reject(query, _), do: query
967 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
968 raise "Can't use the child object without preloading!"
971 defp restrict_tag_all(query, %{"tag_all" => tag_all})
972 when is_list(tag_all) and tag_all != [] do
974 [_activity, object] in query,
975 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
979 defp restrict_tag_all(query, _), do: query
981 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
982 raise "Can't use the child object without preloading!"
985 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
987 [_activity, object] in query,
988 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
992 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
994 [_activity, object] in query,
995 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
999 defp restrict_tag(query, _), do: query
1001 defp restrict_recipients(query, [], _user), do: query
1003 defp restrict_recipients(query, recipients, nil) do
1004 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
1007 defp restrict_recipients(query, recipients, user) do
1010 where: fragment("? && ?", ^recipients, activity.recipients),
1011 or_where: activity.actor == ^user.ap_id
1015 defp restrict_local(query, %{"local_only" => true}) do
1016 from(activity in query, where: activity.local == true)
1019 defp restrict_local(query, _), do: query
1021 defp restrict_actor(query, %{"actor_id" => actor_id}) do
1022 from(activity in query, where: activity.actor == ^actor_id)
1025 defp restrict_actor(query, _), do: query
1027 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
1028 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
1031 defp restrict_type(query, %{"type" => type}) do
1032 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
1035 defp restrict_type(query, _), do: query
1037 defp restrict_state(query, %{"state" => state}) do
1038 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
1041 defp restrict_state(query, _), do: query
1043 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
1045 [_activity, object] in query,
1046 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
1050 defp restrict_favorited_by(query, _), do: query
1052 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
1053 raise "Can't use the child object without preloading!"
1056 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
1058 [_activity, object] in query,
1059 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
1063 defp restrict_media(query, _), do: query
1065 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
1067 [_activity, object] in query,
1068 where: fragment("?->>'inReplyTo' is null", object.data)
1072 defp restrict_replies(query, _), do: query
1074 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
1075 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1078 defp restrict_reblogs(query, _), do: query
1080 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
1082 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
1083 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
1086 from([activity] in query,
1087 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1088 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
1091 unless opts["skip_preload"] do
1092 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1098 defp restrict_muted(query, _), do: query
1100 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
1101 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
1102 domain_blocks = user.domain_blocks || []
1104 following_ap_ids = User.get_friends_ap_ids(user)
1107 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1110 [activity, object: o] in query,
1111 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1112 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
1115 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1122 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1130 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1139 defp restrict_blocked(query, _), do: query
1141 defp restrict_unlisted(query) do
1146 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1148 ^[Constants.as_public()]
1153 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
1154 from(activity in query, where: activity.id in ^ids)
1157 defp restrict_pinned(query, _), do: query
1159 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1160 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1166 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1174 defp restrict_muted_reblogs(query, _), do: query
1176 defp restrict_instance(query, %{"instance" => instance}) do
1181 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1185 from(activity in query, where: activity.actor in ^users)
1188 defp restrict_instance(query, _), do: query
1190 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1192 defp exclude_poll_votes(query, _) do
1193 if has_named_binding?(query, :object) do
1194 from([activity, object: o] in query,
1195 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1202 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1203 from(activity in query, where: activity.id != ^id)
1206 defp exclude_id(query, _), do: query
1208 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1210 defp maybe_preload_objects(query, _) do
1212 |> Activity.with_preloaded_object()
1215 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1217 defp maybe_preload_bookmarks(query, opts) do
1219 |> Activity.with_preloaded_bookmark(opts["user"])
1222 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1224 |> Activity.with_preloaded_report_notes()
1227 defp maybe_preload_report_notes(query, _), do: query
1229 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1231 defp maybe_set_thread_muted_field(query, opts) do
1233 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1236 defp maybe_order(query, %{order: :desc}) do
1238 |> order_by(desc: :id)
1241 defp maybe_order(query, %{order: :asc}) do
1243 |> order_by(asc: :id)
1246 defp maybe_order(query, _), do: query
1248 defp fetch_activities_query_ap_ids_ops(opts) do
1249 source_user = opts["muting_user"]
1250 ap_id_relations = if source_user, do: [:mute, :reblog_mute], else: []
1254 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1260 preloaded_ap_ids = User.outgoing_relations_ap_ids(source_user, ap_id_relations)
1262 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1263 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1265 restrict_muted_reblogs_opts =
1266 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1268 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1271 def fetch_activities_query(recipients, opts \\ %{}) do
1272 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1273 fetch_activities_query_ap_ids_ops(opts)
1276 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1280 |> maybe_preload_objects(opts)
1281 |> maybe_preload_bookmarks(opts)
1282 |> maybe_preload_report_notes(opts)
1283 |> maybe_set_thread_muted_field(opts)
1284 |> maybe_order(opts)
1285 |> restrict_recipients(recipients, opts["user"])
1286 |> restrict_tag(opts)
1287 |> restrict_tag_reject(opts)
1288 |> restrict_tag_all(opts)
1289 |> restrict_since(opts)
1290 |> restrict_local(opts)
1291 |> restrict_actor(opts)
1292 |> restrict_type(opts)
1293 |> restrict_state(opts)
1294 |> restrict_favorited_by(opts)
1295 |> restrict_blocked(restrict_blocked_opts)
1296 |> restrict_muted(restrict_muted_opts)
1297 |> restrict_media(opts)
1298 |> restrict_visibility(opts)
1299 |> restrict_thread_visibility(opts, config)
1300 |> restrict_replies(opts)
1301 |> restrict_reblogs(opts)
1302 |> restrict_pinned(opts)
1303 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1304 |> restrict_instance(opts)
1305 |> Activity.restrict_deactivated_users()
1306 |> exclude_poll_votes(opts)
1307 |> exclude_visibility(opts)
1310 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1311 list_memberships = Pleroma.List.memberships(opts["user"])
1313 fetch_activities_query(recipients ++ list_memberships, opts)
1314 |> Pagination.fetch_paginated(opts, pagination)
1316 |> maybe_update_cc(list_memberships, opts["user"])
1320 Fetch favorites activities of user with order by sort adds to favorites
1322 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1323 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1325 |> Activity.Queries.by_actor()
1326 |> Activity.Queries.by_type("Like")
1327 |> Activity.with_joined_object()
1328 |> Object.with_joined_activity()
1329 |> select([_like, object, activity], %{activity | object: object})
1330 |> order_by([like, _, _], desc: like.id)
1331 |> Pagination.fetch_paginated(
1332 Map.merge(params, %{"skip_order" => true}),
1338 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1339 when is_list(list_memberships) and length(list_memberships) > 0 do
1340 Enum.map(activities, fn
1341 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1342 if Enum.any?(bcc, &(&1 in list_memberships)) do
1343 update_in(activity.data["cc"], &[user_ap_id | &1])
1353 defp maybe_update_cc(activities, _, _), do: activities
1355 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1356 from(activity in query,
1358 fragment("? && ?", activity.recipients, ^recipients) or
1359 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1360 ^Constants.as_public() in activity.recipients)
1364 def fetch_activities_bounded(
1366 recipients_with_public,
1368 pagination \\ :keyset
1370 fetch_activities_query([], opts)
1371 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1372 |> Pagination.fetch_paginated(opts, pagination)
1376 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1377 def upload(file, opts \\ []) do
1378 with {:ok, data} <- Upload.store(file, opts) do
1381 Map.put(data, "actor", opts[:actor])
1386 Repo.insert(%Object{data: obj_data})
1390 defp object_to_user_data(data) do
1392 data["icon"]["url"] &&
1395 "url" => [%{"href" => data["icon"]["url"]}]
1399 data["image"]["url"] &&
1402 "url" => [%{"href" => data["image"]["url"]}]
1407 |> Map.get("attachment", [])
1408 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1409 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1411 locked = data["manuallyApprovesFollowers"] || false
1412 data = Transmogrifier.maybe_fix_user_object(data)
1413 discoverable = data["discoverable"] || false
1414 invisible = data["invisible"] || false
1415 actor_type = data["type"] || "Person"
1424 discoverable: discoverable,
1425 invisible: invisible,
1428 follower_address: data["followers"],
1429 following_address: data["following"],
1430 bio: data["summary"],
1431 actor_type: actor_type,
1432 also_known_as: Map.get(data, "alsoKnownAs", [])
1435 # nickname can be nil because of virtual actors
1437 if data["preferredUsername"] do
1441 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1444 Map.put(user_data, :nickname, nil)
1450 def fetch_follow_information_for_user(user) do
1451 with {:ok, following_data} <-
1452 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1453 {:ok, hide_follows} <- collection_private(following_data),
1454 {:ok, followers_data} <-
1455 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1456 {:ok, hide_followers} <- collection_private(followers_data) do
1459 hide_follows: hide_follows,
1460 follower_count: normalize_counter(followers_data["totalItems"]),
1461 following_count: normalize_counter(following_data["totalItems"]),
1462 hide_followers: hide_followers
1465 {:error, _} = e -> e
1470 defp normalize_counter(counter) when is_integer(counter), do: counter
1471 defp normalize_counter(_), do: 0
1473 defp maybe_update_follow_information(data) do
1474 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1475 {:ok, info} <- fetch_follow_information_for_user(data) do
1476 info = Map.merge(data[:info] || %{}, info)
1477 Map.put(data, :info, info)
1479 {:enabled, false} ->
1484 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1491 defp collection_private(%{"first" => %{"type" => type}})
1492 when type in ["CollectionPage", "OrderedCollectionPage"],
1495 defp collection_private(%{"first" => first}) do
1496 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1497 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1500 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1501 {:error, _} = e -> e
1506 defp collection_private(_data), do: {:ok, true}
1508 def user_data_from_user_object(data) do
1509 with {:ok, data} <- MRF.filter(data),
1510 {:ok, data} <- object_to_user_data(data) do
1517 def fetch_and_prepare_user_from_ap_id(ap_id) do
1518 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1519 {:ok, data} <- user_data_from_user_object(data),
1520 data <- maybe_update_follow_information(data) do
1523 {:error, "Object has been deleted"} = e ->
1524 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1528 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1533 def make_user_from_ap_id(ap_id) do
1534 if _user = User.get_cached_by_ap_id(ap_id) do
1535 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1537 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1538 User.insert_or_update_user(data)
1545 def make_user_from_nickname(nickname) do
1546 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1547 make_user_from_ap_id(ap_id)
1549 _e -> {:error, "No AP id in WebFinger"}
1553 # filter out broken threads
1554 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1555 entire_thread_visible_for_user?(activity, user)
1558 # do post-processing on a specific activity
1559 def contain_activity(%Activity{} = activity, %User{} = user) do
1560 contain_broken_threads(activity, user)
1563 def fetch_direct_messages_query do
1565 |> restrict_type(%{"type" => "Create"})
1566 |> restrict_visibility(%{visibility: "direct"})
1567 |> order_by([activity], asc: activity.id)