1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
92 Object.increase_vote_count(reply_ap_id, name, actor)
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
109 Repo.insert(%Activity{
112 recipients: recipients,
113 actor: object["actor"]
115 {:ok, activity, meta}
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
135 recipients: recipients
138 |> maybe_create_activity_expiration()
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
147 %Activity{} = activity ->
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
155 recipients: recipients,
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
173 stream_out_participations(participations)
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
182 defp maybe_create_activity_expiration(result), do: result
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
192 defp get_participations({:ok, conversation}) do
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
198 defp get_participations(_), do: []
200 def stream_out_participations(participations) do
203 |> Repo.preload(:user)
205 Streamer.stream("participation", participations)
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
224 def stream_out_participations(_, _), do: :noop
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
233 def stream_out(_activity) do
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
253 %{to: to, actor: actor, published: published, context: context, object: object},
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
267 {:quick_insert, true, activity} ->
270 {:fake, true, activity} ->
274 Repo.rollback(message)
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
287 %{to: to, actor: actor, published: published, context: context, object: object},
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
324 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
325 {:ok, Activity.t()} | {:error, any()}
326 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
327 with {:ok, result} <-
328 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
333 defp do_follow(follower, followed, activity_id, local, opts) do
334 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
335 data = make_follow_data(follower, followed, activity_id)
337 with {:ok, activity} <- insert(data, local),
338 _ <- skip_notify_and_stream || notify_and_stream(activity),
339 :ok <- maybe_federate(activity) do
342 {:error, error} -> Repo.rollback(error)
346 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
347 {:ok, Activity.t()} | nil | {:error, any()}
348 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
355 defp do_unfollow(follower, followed, activity_id, local) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
365 {:error, error} -> Repo.rollback(error)
369 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
370 {:ok, Activity.t()} | {:error, any()}
371 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
372 with {:ok, result} <-
373 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
378 defp do_block(blocker, blocked, activity_id, local) do
379 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
381 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
382 unfollow(blocker, blocked, nil, local)
385 block_data = make_block_data(blocker, blocked, activity_id)
387 with {:ok, activity} <- insert(block_data, local),
388 _ <- notify_and_stream(activity),
389 :ok <- maybe_federate(activity) do
392 {:error, error} -> Repo.rollback(error)
396 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
406 # only accept false as false value
407 local = !(params[:local] == false)
408 forward = !(params[:forward] == false)
410 additional = params[:additional] || %{}
414 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
416 Map.merge(additional, %{"to" => [], "cc" => []})
419 with flag_data <- make_flag_data(params, additional),
420 {:ok, activity} <- insert(flag_data, local),
421 {:ok, stripped_activity} <- strip_report_status_data(activity),
422 _ <- notify_and_stream(activity),
423 :ok <- maybe_federate(stripped_activity) do
424 User.all_superusers()
425 |> Enum.filter(fn user -> not is_nil(user.email) end)
426 |> Enum.each(fn superuser ->
428 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
429 |> Pleroma.Emails.Mailer.deliver_async()
436 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
437 def move(%User{} = origin, %User{} = target, local \\ true) do
440 "actor" => origin.ap_id,
441 "object" => origin.ap_id,
442 "target" => target.ap_id
445 with true <- origin.ap_id in target.also_known_as,
446 {:ok, activity} <- insert(params, local),
447 _ <- notify_and_stream(activity) do
448 maybe_federate(activity)
450 BackgroundWorker.enqueue("move_following", %{
451 "origin_id" => origin.id,
452 "target_id" => target.id
457 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
462 def fetch_activities_for_context_query(context, opts) do
463 public = [Constants.as_public()]
467 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
470 from(activity in Activity)
471 |> maybe_preload_objects(opts)
472 |> maybe_preload_bookmarks(opts)
473 |> maybe_set_thread_muted_field(opts)
474 |> restrict_blocked(opts)
475 |> restrict_recipients(recipients, opts[:user])
479 "?->>'type' = ? and ?->>'context' = ?",
486 |> exclude_poll_votes(opts)
488 |> order_by([activity], desc: activity.id)
491 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
492 def fetch_activities_for_context(context, opts \\ %{}) do
494 |> fetch_activities_for_context_query(opts)
498 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
499 FlakeId.Ecto.CompatType.t() | nil
500 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
502 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
503 |> restrict_visibility(%{visibility: "direct"})
509 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
510 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
511 opts = Map.delete(opts, :user)
513 [Constants.as_public()]
514 |> fetch_activities_query(opts)
515 |> restrict_unlisted(opts)
516 |> Pagination.fetch_paginated(opts, pagination)
519 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
520 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
522 |> Map.put(:restrict_unlisted, true)
523 |> fetch_public_or_unlisted_activities(pagination)
526 @valid_visibilities ~w[direct unlisted public private]
528 defp restrict_visibility(query, %{visibility: visibility})
529 when is_list(visibility) do
530 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
535 "activity_visibility(?, ?, ?) = ANY (?)",
543 Logger.error("Could not restrict visibility to #{visibility}")
547 defp restrict_visibility(query, %{visibility: visibility})
548 when visibility in @valid_visibilities do
552 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
556 defp restrict_visibility(_query, %{visibility: visibility})
557 when visibility not in @valid_visibilities do
558 Logger.error("Could not restrict visibility to #{visibility}")
561 defp restrict_visibility(query, _visibility), do: query
563 defp exclude_visibility(query, %{exclude_visibilities: visibility})
564 when is_list(visibility) do
565 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
570 "activity_visibility(?, ?, ?) = ANY (?)",
578 Logger.error("Could not exclude visibility to #{visibility}")
583 defp exclude_visibility(query, %{exclude_visibilities: visibility})
584 when visibility in @valid_visibilities do
589 "activity_visibility(?, ?, ?) = ?",
598 defp exclude_visibility(query, %{exclude_visibilities: visibility})
599 when visibility not in [nil | @valid_visibilities] do
600 Logger.error("Could not exclude visibility to #{visibility}")
604 defp exclude_visibility(query, _visibility), do: query
606 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
609 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
612 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
615 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
619 defp restrict_thread_visibility(query, _, _), do: query
621 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
624 |> Map.put(:user, reading_user)
625 |> Map.put(:actor_id, user.ap_id)
628 godmode: params[:godmode],
629 reading_user: reading_user
631 |> user_activities_recipients()
632 |> fetch_activities(params)
636 def fetch_user_activities(user, reading_user, params \\ %{}) do
639 |> Map.put(:type, ["Create", "Announce"])
640 |> Map.put(:user, reading_user)
641 |> Map.put(:actor_id, user.ap_id)
642 |> Map.put(:pinned_activity_ids, user.pinned_activities)
645 if User.blocks?(reading_user, user) do
649 |> Map.put(:blocking_user, reading_user)
650 |> Map.put(:muting_user, reading_user)
654 godmode: params[:godmode],
655 reading_user: reading_user
657 |> user_activities_recipients()
658 |> fetch_activities(params)
662 def fetch_statuses(reading_user, params) do
663 params = Map.put(params, :type, ["Create", "Announce"])
666 godmode: params[:godmode],
667 reading_user: reading_user
669 |> user_activities_recipients()
670 |> fetch_activities(params, :offset)
674 defp user_activities_recipients(%{godmode: true}), do: []
676 defp user_activities_recipients(%{reading_user: reading_user}) do
678 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
680 [Constants.as_public()]
684 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
685 raise "Can't use the child object without preloading!"
688 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
690 [activity, object] in query,
693 "?->>'type' != ? or ?->>'actor' != ?",
702 defp restrict_announce_object_actor(query, _), do: query
704 defp restrict_since(query, %{since_id: ""}), do: query
706 defp restrict_since(query, %{since_id: since_id}) do
707 from(activity in query, where: activity.id > ^since_id)
710 defp restrict_since(query, _), do: query
712 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
713 raise "Can't use the child object without preloading!"
716 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
718 [_activity, object] in query,
719 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
723 defp restrict_tag_reject(query, _), do: query
725 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
726 raise "Can't use the child object without preloading!"
729 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
731 [_activity, object] in query,
732 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
736 defp restrict_tag_all(query, _), do: query
738 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
739 raise "Can't use the child object without preloading!"
742 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
744 [_activity, object] in query,
745 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
749 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
751 [_activity, object] in query,
752 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
756 defp restrict_tag(query, _), do: query
758 defp restrict_recipients(query, [], _user), do: query
760 defp restrict_recipients(query, recipients, nil) do
761 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
764 defp restrict_recipients(query, recipients, user) do
767 where: fragment("? && ?", ^recipients, activity.recipients),
768 or_where: activity.actor == ^user.ap_id
772 defp restrict_local(query, %{local_only: true}) do
773 from(activity in query, where: activity.local == true)
776 defp restrict_local(query, _), do: query
778 defp restrict_actor(query, %{actor_id: actor_id}) do
779 from(activity in query, where: activity.actor == ^actor_id)
782 defp restrict_actor(query, _), do: query
784 defp restrict_type(query, %{type: type}) when is_binary(type) do
785 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
788 defp restrict_type(query, %{type: type}) do
789 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
792 defp restrict_type(query, _), do: query
794 defp restrict_state(query, %{state: state}) do
795 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
798 defp restrict_state(query, _), do: query
800 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
802 [_activity, object] in query,
803 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
807 defp restrict_favorited_by(query, _), do: query
809 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
810 raise "Can't use the child object without preloading!"
813 defp restrict_media(query, %{only_media: true}) do
815 [activity, object] in query,
816 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
817 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
821 defp restrict_media(query, _), do: query
823 defp restrict_replies(query, %{exclude_replies: true}) do
825 [_activity, object] in query,
826 where: fragment("?->>'inReplyTo' is null", object.data)
830 defp restrict_replies(query, %{
831 reply_filtering_user: user,
832 reply_visibility: "self"
835 [activity, object] in query,
838 "?->>'inReplyTo' is null OR ? = ANY(?)",
846 defp restrict_replies(query, %{
847 reply_filtering_user: user,
848 reply_visibility: "following"
851 [activity, object] in query,
854 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
856 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
865 defp restrict_replies(query, _), do: query
867 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
868 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
871 defp restrict_reblogs(query, _), do: query
873 defp restrict_muted(query, %{with_muted: true}), do: query
875 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
876 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
879 from([activity] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
881 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
884 unless opts[:skip_preload] do
885 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
891 defp restrict_muted(query, _), do: query
893 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
894 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
895 domain_blocks = user.domain_blocks || []
897 following_ap_ids = User.get_friends_ap_ids(user)
900 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
903 [activity, object: o] in query,
904 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
905 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
908 "recipients_contain_blocked_domains(?, ?) = false",
914 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
921 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
929 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
938 defp restrict_blocked(query, _), do: query
940 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
945 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
947 ^[Constants.as_public()]
952 defp restrict_unlisted(query, _), do: query
954 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
955 from(activity in query, where: activity.id in ^ids)
958 defp restrict_pinned(query, _), do: query
960 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
961 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
967 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
975 defp restrict_muted_reblogs(query, _), do: query
977 defp restrict_instance(query, %{instance: instance}) do
982 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
986 from(activity in query, where: activity.actor in ^users)
989 defp restrict_instance(query, _), do: query
991 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
993 defp exclude_poll_votes(query, _) do
994 if has_named_binding?(query, :object) do
995 from([activity, object: o] in query,
996 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1003 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1005 defp exclude_chat_messages(query, _) do
1006 if has_named_binding?(query, :object) do
1007 from([activity, object: o] in query,
1008 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1015 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1017 defp exclude_invisible_actors(query, _opts) do
1019 User.Query.build(%{invisible: true, select: [:ap_id]})
1021 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1023 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1026 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1027 from(activity in query, where: activity.id != ^id)
1030 defp exclude_id(query, _), do: query
1032 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1034 defp maybe_preload_objects(query, _) do
1036 |> Activity.with_preloaded_object()
1039 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1041 defp maybe_preload_bookmarks(query, opts) do
1043 |> Activity.with_preloaded_bookmark(opts[:user])
1046 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1048 |> Activity.with_preloaded_report_notes()
1051 defp maybe_preload_report_notes(query, _), do: query
1053 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1055 defp maybe_set_thread_muted_field(query, opts) do
1057 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1060 defp maybe_order(query, %{order: :desc}) do
1062 |> order_by(desc: :id)
1065 defp maybe_order(query, %{order: :asc}) do
1067 |> order_by(asc: :id)
1070 defp maybe_order(query, _), do: query
1072 defp fetch_activities_query_ap_ids_ops(opts) do
1073 source_user = opts[:muting_user]
1074 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1076 ap_id_relationships =
1077 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1078 [:block | ap_id_relationships]
1083 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1085 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1086 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1088 restrict_muted_reblogs_opts =
1089 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1091 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1094 def fetch_activities_query(recipients, opts \\ %{}) do
1095 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1096 fetch_activities_query_ap_ids_ops(opts)
1099 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1103 |> maybe_preload_objects(opts)
1104 |> maybe_preload_bookmarks(opts)
1105 |> maybe_preload_report_notes(opts)
1106 |> maybe_set_thread_muted_field(opts)
1107 |> maybe_order(opts)
1108 |> restrict_recipients(recipients, opts[:user])
1109 |> restrict_replies(opts)
1110 |> restrict_tag(opts)
1111 |> restrict_tag_reject(opts)
1112 |> restrict_tag_all(opts)
1113 |> restrict_since(opts)
1114 |> restrict_local(opts)
1115 |> restrict_actor(opts)
1116 |> restrict_type(opts)
1117 |> restrict_state(opts)
1118 |> restrict_favorited_by(opts)
1119 |> restrict_blocked(restrict_blocked_opts)
1120 |> restrict_muted(restrict_muted_opts)
1121 |> restrict_media(opts)
1122 |> restrict_visibility(opts)
1123 |> restrict_thread_visibility(opts, config)
1124 |> restrict_reblogs(opts)
1125 |> restrict_pinned(opts)
1126 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1127 |> restrict_instance(opts)
1128 |> restrict_announce_object_actor(opts)
1129 |> Activity.restrict_deactivated_users()
1130 |> exclude_poll_votes(opts)
1131 |> exclude_chat_messages(opts)
1132 |> exclude_invisible_actors(opts)
1133 |> exclude_visibility(opts)
1136 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1137 list_memberships = Pleroma.List.memberships(opts[:user])
1139 fetch_activities_query(recipients ++ list_memberships, opts)
1140 |> Pagination.fetch_paginated(opts, pagination)
1142 |> maybe_update_cc(list_memberships, opts[:user])
1146 Fetch favorites activities of user with order by sort adds to favorites
1148 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1149 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1151 |> Activity.Queries.by_actor()
1152 |> Activity.Queries.by_type("Like")
1153 |> Activity.with_joined_object()
1154 |> Object.with_joined_activity()
1155 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1156 |> order_by([like, _, _], desc_nulls_last: like.id)
1157 |> Pagination.fetch_paginated(
1158 Map.merge(params, %{skip_order: true}),
1163 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1164 Enum.map(activities, fn
1165 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1166 if Enum.any?(bcc, &(&1 in list_memberships)) do
1167 update_in(activity.data["cc"], &[user_ap_id | &1])
1177 defp maybe_update_cc(activities, _, _), do: activities
1179 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1180 from(activity in query,
1182 fragment("? && ?", activity.recipients, ^recipients) or
1183 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1184 ^Constants.as_public() in activity.recipients)
1188 def fetch_activities_bounded(
1190 recipients_with_public,
1192 pagination \\ :keyset
1194 fetch_activities_query([], opts)
1195 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1196 |> Pagination.fetch_paginated(opts, pagination)
1200 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1201 def upload(file, opts \\ []) do
1202 with {:ok, data} <- Upload.store(file, opts) do
1203 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1205 Repo.insert(%Object{data: obj_data})
1209 @spec get_actor_url(any()) :: binary() | nil
1210 defp get_actor_url(url) when is_binary(url), do: url
1211 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1213 defp get_actor_url(url) when is_list(url) do
1219 defp get_actor_url(_url), do: nil
1221 defp object_to_user_data(data) do
1223 data["icon"]["url"] &&
1226 "url" => [%{"href" => data["icon"]["url"]}]
1230 data["image"]["url"] &&
1233 "url" => [%{"href" => data["image"]["url"]}]
1238 |> Map.get("attachment", [])
1239 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1240 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1244 |> Map.get("tag", [])
1246 %{"type" => "Emoji"} -> true
1249 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1250 {String.trim(name, ":"), url}
1253 locked = data["manuallyApprovesFollowers"] || false
1254 data = Transmogrifier.maybe_fix_user_object(data)
1255 discoverable = data["discoverable"] || false
1256 invisible = data["invisible"] || false
1257 actor_type = data["type"] || "Person"
1260 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1261 data["publicKey"]["publicKeyPem"]
1267 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1268 data["endpoints"]["sharedInbox"]
1275 uri: get_actor_url(data["url"]),
1281 discoverable: discoverable,
1282 invisible: invisible,
1285 follower_address: data["followers"],
1286 following_address: data["following"],
1287 bio: data["summary"],
1288 actor_type: actor_type,
1289 also_known_as: Map.get(data, "alsoKnownAs", []),
1290 public_key: public_key,
1291 inbox: data["inbox"],
1292 shared_inbox: shared_inbox
1295 # nickname can be nil because of virtual actors
1296 if data["preferredUsername"] do
1300 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1303 Map.put(user_data, :nickname, nil)
1307 def fetch_follow_information_for_user(user) do
1308 with {:ok, following_data} <-
1309 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1310 {:ok, hide_follows} <- collection_private(following_data),
1311 {:ok, followers_data} <-
1312 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1313 {:ok, hide_followers} <- collection_private(followers_data) do
1316 hide_follows: hide_follows,
1317 follower_count: normalize_counter(followers_data["totalItems"]),
1318 following_count: normalize_counter(following_data["totalItems"]),
1319 hide_followers: hide_followers
1322 {:error, _} = e -> e
1327 defp normalize_counter(counter) when is_integer(counter), do: counter
1328 defp normalize_counter(_), do: 0
1330 def maybe_update_follow_information(user_data) do
1331 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1332 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1334 {:collections_available,
1335 !!(user_data[:following_address] && user_data[:follower_address])},
1337 fetch_follow_information_for_user(user_data) do
1338 info = Map.merge(user_data[:info] || %{}, info)
1341 |> Map.put(:info, info)
1343 {:user_type_check, false} ->
1346 {:collections_available, false} ->
1349 {:enabled, false} ->
1354 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1361 defp collection_private(%{"first" => %{"type" => type}})
1362 when type in ["CollectionPage", "OrderedCollectionPage"],
1365 defp collection_private(%{"first" => first}) do
1366 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1367 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1370 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1371 {:error, _} = e -> e
1376 defp collection_private(_data), do: {:ok, true}
1378 def user_data_from_user_object(data) do
1379 with {:ok, data} <- MRF.filter(data) do
1380 {:ok, object_to_user_data(data)}
1386 def fetch_and_prepare_user_from_ap_id(ap_id) do
1387 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1388 {:ok, data} <- user_data_from_user_object(data) do
1389 {:ok, maybe_update_follow_information(data)}
1391 {:error, "Object has been deleted" = e} ->
1392 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1396 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1401 def make_user_from_ap_id(ap_id) do
1402 user = User.get_cached_by_ap_id(ap_id)
1404 if user && !User.ap_enabled?(user) do
1405 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1407 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1410 |> User.remote_user_changeset(data)
1411 |> User.update_and_set_cache()
1414 |> User.remote_user_changeset()
1422 def make_user_from_nickname(nickname) do
1423 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1424 make_user_from_ap_id(ap_id)
1426 _e -> {:error, "No AP id in WebFinger"}
1430 # filter out broken threads
1431 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1432 entire_thread_visible_for_user?(activity, user)
1435 # do post-processing on a specific activity
1436 def contain_activity(%Activity{} = activity, %User{} = user) do
1437 contain_broken_threads(activity, user)
1440 def fetch_direct_messages_query do
1442 |> restrict_type(%{type: "Create"})
1443 |> restrict_visibility(%{visibility: "direct"})
1444 |> order_by([activity], asc: activity.id)