1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
92 Object.increase_vote_count(reply_ap_id, name, actor)
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
109 Repo.insert(%Activity{
112 recipients: recipients,
113 actor: object["actor"]
115 {:ok, activity, meta}
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
135 recipients: recipients
138 |> maybe_create_activity_expiration()
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
147 %Activity{} = activity ->
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
155 recipients: recipients,
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
173 stream_out_participations(participations)
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
182 defp maybe_create_activity_expiration(result), do: result
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
192 defp get_participations({:ok, conversation}) do
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
198 defp get_participations(_), do: []
200 def stream_out_participations(participations) do
203 |> Repo.preload(:user)
205 Streamer.stream("participation", participations)
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
224 def stream_out_participations(_, _), do: :noop
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
233 def stream_out(_activity) do
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
253 %{to: to, actor: actor, published: published, context: context, object: object},
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
267 {:quick_insert, true, activity} ->
270 {:fake, true, activity} ->
274 Repo.rollback(message)
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
287 %{to: to, actor: actor, published: published, context: context, object: object},
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
324 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
325 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
326 local = !(params[:local] == false)
327 activity_id = params[:activity_id]
337 |> Maps.put_if_present("id", activity_id)
339 with {:ok, activity} <- insert(data, local),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(activity) do
346 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
347 {:ok, Activity.t()} | {:error, any()}
348 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
355 defp do_follow(follower, followed, activity_id, local, opts) do
356 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
357 data = make_follow_data(follower, followed, activity_id)
359 with {:ok, activity} <- insert(data, local),
360 _ <- skip_notify_and_stream || notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
364 {:error, error} -> Repo.rollback(error)
368 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
369 {:ok, Activity.t()} | nil | {:error, any()}
370 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
371 with {:ok, result} <-
372 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
377 defp do_unfollow(follower, followed, activity_id, local) do
378 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
379 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
380 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
381 {:ok, activity} <- insert(unfollow_data, local),
382 _ <- notify_and_stream(activity),
383 :ok <- maybe_federate(activity) do
387 {:error, error} -> Repo.rollback(error)
391 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
392 {:ok, Activity.t()} | {:error, any()}
393 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
394 with {:ok, result} <-
395 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
400 defp do_block(blocker, blocked, activity_id, local) do
401 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
403 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
404 unfollow(blocker, blocked, nil, local)
407 block_data = make_block_data(blocker, blocked, activity_id)
409 with {:ok, activity} <- insert(block_data, local),
410 _ <- notify_and_stream(activity),
411 :ok <- maybe_federate(activity) do
414 {:error, error} -> Repo.rollback(error)
418 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
432 additional = params[:additional] || %{}
436 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
438 Map.merge(additional, %{"to" => [], "cc" => []})
441 with flag_data <- make_flag_data(params, additional),
442 {:ok, activity} <- insert(flag_data, local),
443 {:ok, stripped_activity} <- strip_report_status_data(activity),
444 _ <- notify_and_stream(activity),
445 :ok <- maybe_federate(stripped_activity) do
446 User.all_superusers()
447 |> Enum.filter(fn user -> not is_nil(user.email) end)
448 |> Enum.each(fn superuser ->
450 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
451 |> Pleroma.Emails.Mailer.deliver_async()
458 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
459 def move(%User{} = origin, %User{} = target, local \\ true) do
462 "actor" => origin.ap_id,
463 "object" => origin.ap_id,
464 "target" => target.ap_id
467 with true <- origin.ap_id in target.also_known_as,
468 {:ok, activity} <- insert(params, local),
469 _ <- notify_and_stream(activity) do
470 maybe_federate(activity)
472 BackgroundWorker.enqueue("move_following", %{
473 "origin_id" => origin.id,
474 "target_id" => target.id
479 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
484 def fetch_activities_for_context_query(context, opts) do
485 public = [Constants.as_public()]
489 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
492 from(activity in Activity)
493 |> maybe_preload_objects(opts)
494 |> maybe_preload_bookmarks(opts)
495 |> maybe_set_thread_muted_field(opts)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts[:user])
501 "?->>'type' = ? and ?->>'context' = ?",
508 |> exclude_poll_votes(opts)
510 |> order_by([activity], desc: activity.id)
513 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
514 def fetch_activities_for_context(context, opts \\ %{}) do
516 |> fetch_activities_for_context_query(opts)
520 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
521 FlakeId.Ecto.CompatType.t() | nil
522 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
524 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
525 |> restrict_visibility(%{visibility: "direct"})
531 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
532 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
533 opts = Map.delete(opts, :user)
535 [Constants.as_public()]
536 |> fetch_activities_query(opts)
537 |> restrict_unlisted(opts)
538 |> Pagination.fetch_paginated(opts, pagination)
541 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
544 |> Map.put(:restrict_unlisted, true)
545 |> fetch_public_or_unlisted_activities(pagination)
548 @valid_visibilities ~w[direct unlisted public private]
550 defp restrict_visibility(query, %{visibility: visibility})
551 when is_list(visibility) do
552 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
557 "activity_visibility(?, ?, ?) = ANY (?)",
565 Logger.error("Could not restrict visibility to #{visibility}")
569 defp restrict_visibility(query, %{visibility: visibility})
570 when visibility in @valid_visibilities do
574 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
578 defp restrict_visibility(_query, %{visibility: visibility})
579 when visibility not in @valid_visibilities do
580 Logger.error("Could not restrict visibility to #{visibility}")
583 defp restrict_visibility(query, _visibility), do: query
585 defp exclude_visibility(query, %{exclude_visibilities: visibility})
586 when is_list(visibility) do
587 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
592 "activity_visibility(?, ?, ?) = ANY (?)",
600 Logger.error("Could not exclude visibility to #{visibility}")
605 defp exclude_visibility(query, %{exclude_visibilities: visibility})
606 when visibility in @valid_visibilities do
611 "activity_visibility(?, ?, ?) = ?",
620 defp exclude_visibility(query, %{exclude_visibilities: visibility})
621 when visibility not in [nil | @valid_visibilities] do
622 Logger.error("Could not exclude visibility to #{visibility}")
626 defp exclude_visibility(query, _visibility), do: query
628 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
631 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
634 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
637 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
641 defp restrict_thread_visibility(query, _, _), do: query
643 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
646 |> Map.put(:user, reading_user)
647 |> Map.put(:actor_id, user.ap_id)
650 godmode: params[:godmode],
651 reading_user: reading_user
653 |> user_activities_recipients()
654 |> fetch_activities(params)
658 def fetch_user_activities(user, reading_user, params \\ %{}) do
661 |> Map.put(:type, ["Create", "Announce"])
662 |> Map.put(:user, reading_user)
663 |> Map.put(:actor_id, user.ap_id)
664 |> Map.put(:pinned_activity_ids, user.pinned_activities)
667 if User.blocks?(reading_user, user) do
671 |> Map.put(:blocking_user, reading_user)
672 |> Map.put(:muting_user, reading_user)
676 godmode: params[:godmode],
677 reading_user: reading_user
679 |> user_activities_recipients()
680 |> fetch_activities(params)
684 def fetch_statuses(reading_user, params) do
685 params = Map.put(params, :type, ["Create", "Announce"])
688 godmode: params[:godmode],
689 reading_user: reading_user
691 |> user_activities_recipients()
692 |> fetch_activities(params, :offset)
696 defp user_activities_recipients(%{godmode: true}), do: []
698 defp user_activities_recipients(%{reading_user: reading_user}) do
700 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
702 [Constants.as_public()]
706 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
707 raise "Can't use the child object without preloading!"
710 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
712 [activity, object] in query,
715 "?->>'type' != ? or ?->>'actor' != ?",
724 defp restrict_announce_object_actor(query, _), do: query
726 defp restrict_since(query, %{since_id: ""}), do: query
728 defp restrict_since(query, %{since_id: since_id}) do
729 from(activity in query, where: activity.id > ^since_id)
732 defp restrict_since(query, _), do: query
734 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
735 raise "Can't use the child object without preloading!"
738 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
740 [_activity, object] in query,
741 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
745 defp restrict_tag_reject(query, _), do: query
747 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
748 raise "Can't use the child object without preloading!"
751 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
758 defp restrict_tag_all(query, _), do: query
760 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
761 raise "Can't use the child object without preloading!"
764 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
766 [_activity, object] in query,
767 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
771 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
773 [_activity, object] in query,
774 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
778 defp restrict_tag(query, _), do: query
780 defp restrict_recipients(query, [], _user), do: query
782 defp restrict_recipients(query, recipients, nil) do
783 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
786 defp restrict_recipients(query, recipients, user) do
789 where: fragment("? && ?", ^recipients, activity.recipients),
790 or_where: activity.actor == ^user.ap_id
794 defp restrict_local(query, %{local_only: true}) do
795 from(activity in query, where: activity.local == true)
798 defp restrict_local(query, _), do: query
800 defp restrict_actor(query, %{actor_id: actor_id}) do
801 from(activity in query, where: activity.actor == ^actor_id)
804 defp restrict_actor(query, _), do: query
806 defp restrict_type(query, %{type: type}) when is_binary(type) do
807 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
810 defp restrict_type(query, %{type: type}) do
811 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
814 defp restrict_type(query, _), do: query
816 defp restrict_state(query, %{state: state}) do
817 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
820 defp restrict_state(query, _), do: query
822 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
824 [_activity, object] in query,
825 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
829 defp restrict_favorited_by(query, _), do: query
831 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
832 raise "Can't use the child object without preloading!"
835 defp restrict_media(query, %{only_media: true}) do
837 [activity, object] in query,
838 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
839 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
843 defp restrict_media(query, _), do: query
845 defp restrict_replies(query, %{exclude_replies: true}) do
847 [_activity, object] in query,
848 where: fragment("?->>'inReplyTo' is null", object.data)
852 defp restrict_replies(query, %{
853 reply_filtering_user: user,
854 reply_visibility: "self"
857 [activity, object] in query,
860 "?->>'inReplyTo' is null OR ? = ANY(?)",
868 defp restrict_replies(query, %{
869 reply_filtering_user: user,
870 reply_visibility: "following"
873 [activity, object] in query,
876 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
878 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
887 defp restrict_replies(query, _), do: query
889 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
890 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
893 defp restrict_reblogs(query, _), do: query
895 defp restrict_muted(query, %{with_muted: true}), do: query
897 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
898 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
901 from([activity] in query,
902 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
903 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
906 unless opts[:skip_preload] do
907 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
913 defp restrict_muted(query, _), do: query
915 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
916 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
917 domain_blocks = user.domain_blocks || []
919 following_ap_ids = User.get_friends_ap_ids(user)
922 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
925 [activity, object: o] in query,
926 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
927 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
930 "recipients_contain_blocked_domains(?, ?) = false",
936 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
943 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
951 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
960 defp restrict_blocked(query, _), do: query
962 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
967 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
969 ^[Constants.as_public()]
974 defp restrict_unlisted(query, _), do: query
976 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
977 from(activity in query, where: activity.id in ^ids)
980 defp restrict_pinned(query, _), do: query
982 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
983 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
989 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
997 defp restrict_muted_reblogs(query, _), do: query
999 defp restrict_instance(query, %{instance: instance}) do
1004 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1008 from(activity in query, where: activity.actor in ^users)
1011 defp restrict_instance(query, _), do: query
1013 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1015 defp exclude_poll_votes(query, _) do
1016 if has_named_binding?(query, :object) do
1017 from([activity, object: o] in query,
1018 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1025 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1027 defp exclude_chat_messages(query, _) do
1028 if has_named_binding?(query, :object) do
1029 from([activity, object: o] in query,
1030 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1037 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1039 defp exclude_invisible_actors(query, _opts) do
1041 User.Query.build(%{invisible: true, select: [:ap_id]})
1043 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1045 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1048 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1049 from(activity in query, where: activity.id != ^id)
1052 defp exclude_id(query, _), do: query
1054 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1056 defp maybe_preload_objects(query, _) do
1058 |> Activity.with_preloaded_object()
1061 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1063 defp maybe_preload_bookmarks(query, opts) do
1065 |> Activity.with_preloaded_bookmark(opts[:user])
1068 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1070 |> Activity.with_preloaded_report_notes()
1073 defp maybe_preload_report_notes(query, _), do: query
1075 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1077 defp maybe_set_thread_muted_field(query, opts) do
1079 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1082 defp maybe_order(query, %{order: :desc}) do
1084 |> order_by(desc: :id)
1087 defp maybe_order(query, %{order: :asc}) do
1089 |> order_by(asc: :id)
1092 defp maybe_order(query, _), do: query
1094 defp fetch_activities_query_ap_ids_ops(opts) do
1095 source_user = opts[:muting_user]
1096 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1098 ap_id_relationships =
1099 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1100 [:block | ap_id_relationships]
1105 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1107 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1108 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1110 restrict_muted_reblogs_opts =
1111 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1113 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1116 def fetch_activities_query(recipients, opts \\ %{}) do
1117 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1118 fetch_activities_query_ap_ids_ops(opts)
1121 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1125 |> maybe_preload_objects(opts)
1126 |> maybe_preload_bookmarks(opts)
1127 |> maybe_preload_report_notes(opts)
1128 |> maybe_set_thread_muted_field(opts)
1129 |> maybe_order(opts)
1130 |> restrict_recipients(recipients, opts[:user])
1131 |> restrict_replies(opts)
1132 |> restrict_tag(opts)
1133 |> restrict_tag_reject(opts)
1134 |> restrict_tag_all(opts)
1135 |> restrict_since(opts)
1136 |> restrict_local(opts)
1137 |> restrict_actor(opts)
1138 |> restrict_type(opts)
1139 |> restrict_state(opts)
1140 |> restrict_favorited_by(opts)
1141 |> restrict_blocked(restrict_blocked_opts)
1142 |> restrict_muted(restrict_muted_opts)
1143 |> restrict_media(opts)
1144 |> restrict_visibility(opts)
1145 |> restrict_thread_visibility(opts, config)
1146 |> restrict_reblogs(opts)
1147 |> restrict_pinned(opts)
1148 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1149 |> restrict_instance(opts)
1150 |> restrict_announce_object_actor(opts)
1151 |> Activity.restrict_deactivated_users()
1152 |> exclude_poll_votes(opts)
1153 |> exclude_chat_messages(opts)
1154 |> exclude_invisible_actors(opts)
1155 |> exclude_visibility(opts)
1158 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1159 list_memberships = Pleroma.List.memberships(opts[:user])
1161 fetch_activities_query(recipients ++ list_memberships, opts)
1162 |> Pagination.fetch_paginated(opts, pagination)
1164 |> maybe_update_cc(list_memberships, opts[:user])
1168 Fetch favorites activities of user with order by sort adds to favorites
1170 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1171 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1173 |> Activity.Queries.by_actor()
1174 |> Activity.Queries.by_type("Like")
1175 |> Activity.with_joined_object()
1176 |> Object.with_joined_activity()
1177 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1178 |> order_by([like, _, _], desc_nulls_last: like.id)
1179 |> Pagination.fetch_paginated(
1180 Map.merge(params, %{skip_order: true}),
1185 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1186 Enum.map(activities, fn
1187 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1188 if Enum.any?(bcc, &(&1 in list_memberships)) do
1189 update_in(activity.data["cc"], &[user_ap_id | &1])
1199 defp maybe_update_cc(activities, _, _), do: activities
1201 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1202 from(activity in query,
1204 fragment("? && ?", activity.recipients, ^recipients) or
1205 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1206 ^Constants.as_public() in activity.recipients)
1210 def fetch_activities_bounded(
1212 recipients_with_public,
1214 pagination \\ :keyset
1216 fetch_activities_query([], opts)
1217 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1218 |> Pagination.fetch_paginated(opts, pagination)
1222 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1223 def upload(file, opts \\ []) do
1224 with {:ok, data} <- Upload.store(file, opts) do
1225 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1227 Repo.insert(%Object{data: obj_data})
1231 @spec get_actor_url(any()) :: binary() | nil
1232 defp get_actor_url(url) when is_binary(url), do: url
1233 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1235 defp get_actor_url(url) when is_list(url) do
1241 defp get_actor_url(_url), do: nil
1243 defp object_to_user_data(data) do
1245 data["icon"]["url"] &&
1248 "url" => [%{"href" => data["icon"]["url"]}]
1252 data["image"]["url"] &&
1255 "url" => [%{"href" => data["image"]["url"]}]
1260 |> Map.get("attachment", [])
1261 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1262 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1266 |> Map.get("tag", [])
1268 %{"type" => "Emoji"} -> true
1271 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1272 {String.trim(name, ":"), url}
1275 locked = data["manuallyApprovesFollowers"] || false
1276 data = Transmogrifier.maybe_fix_user_object(data)
1277 discoverable = data["discoverable"] || false
1278 invisible = data["invisible"] || false
1279 actor_type = data["type"] || "Person"
1282 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1283 data["publicKey"]["publicKeyPem"]
1289 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1290 data["endpoints"]["sharedInbox"]
1297 uri: get_actor_url(data["url"]),
1303 discoverable: discoverable,
1304 invisible: invisible,
1307 follower_address: data["followers"],
1308 following_address: data["following"],
1309 bio: data["summary"],
1310 actor_type: actor_type,
1311 also_known_as: Map.get(data, "alsoKnownAs", []),
1312 public_key: public_key,
1313 inbox: data["inbox"],
1314 shared_inbox: shared_inbox
1317 # nickname can be nil because of virtual actors
1318 if data["preferredUsername"] do
1322 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1325 Map.put(user_data, :nickname, nil)
1329 def fetch_follow_information_for_user(user) do
1330 with {:ok, following_data} <-
1331 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1332 {:ok, hide_follows} <- collection_private(following_data),
1333 {:ok, followers_data} <-
1334 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1335 {:ok, hide_followers} <- collection_private(followers_data) do
1338 hide_follows: hide_follows,
1339 follower_count: normalize_counter(followers_data["totalItems"]),
1340 following_count: normalize_counter(following_data["totalItems"]),
1341 hide_followers: hide_followers
1344 {:error, _} = e -> e
1349 defp normalize_counter(counter) when is_integer(counter), do: counter
1350 defp normalize_counter(_), do: 0
1352 def maybe_update_follow_information(user_data) do
1353 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1354 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1356 {:collections_available,
1357 !!(user_data[:following_address] && user_data[:follower_address])},
1359 fetch_follow_information_for_user(user_data) do
1360 info = Map.merge(user_data[:info] || %{}, info)
1363 |> Map.put(:info, info)
1365 {:user_type_check, false} ->
1368 {:collections_available, false} ->
1371 {:enabled, false} ->
1376 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1383 defp collection_private(%{"first" => %{"type" => type}})
1384 when type in ["CollectionPage", "OrderedCollectionPage"],
1387 defp collection_private(%{"first" => first}) do
1388 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1389 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1392 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1393 {:error, _} = e -> e
1398 defp collection_private(_data), do: {:ok, true}
1400 def user_data_from_user_object(data) do
1401 with {:ok, data} <- MRF.filter(data) do
1402 {:ok, object_to_user_data(data)}
1408 def fetch_and_prepare_user_from_ap_id(ap_id) do
1409 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1410 {:ok, data} <- user_data_from_user_object(data) do
1411 {:ok, maybe_update_follow_information(data)}
1413 {:error, "Object has been deleted" = e} ->
1414 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1418 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1423 def make_user_from_ap_id(ap_id) do
1424 user = User.get_cached_by_ap_id(ap_id)
1426 if user && !User.ap_enabled?(user) do
1427 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1429 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1432 |> User.remote_user_changeset(data)
1433 |> User.update_and_set_cache()
1436 |> User.remote_user_changeset()
1444 def make_user_from_nickname(nickname) do
1445 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1446 make_user_from_ap_id(ap_id)
1448 _e -> {:error, "No AP id in WebFinger"}
1452 # filter out broken threads
1453 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1454 entire_thread_visible_for_user?(activity, user)
1457 # do post-processing on a specific activity
1458 def contain_activity(%Activity{} = activity, %User{} = user) do
1459 contain_broken_threads(activity, user)
1462 def fetch_direct_messages_query do
1464 |> restrict_type(%{type: "Create"})
1465 |> restrict_visibility(%{visibility: "direct"})
1466 |> order_by([activity], asc: activity.id)