1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 # For Announce activities, we filter the recipients based on following status for any actors
36 # that match actual users. See issue #164 for more information about why this is necessary.
37 defp get_recipients(%{"type" => "Announce"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = User.get_cached_by_ap_id(data["actor"])
44 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
45 case User.get_cached_by_ap_id(recipient) do
47 user -> User.following?(user, actor)
54 defp get_recipients(%{"type" => "Create"} = data) do
55 to = Map.get(data, "to", [])
56 cc = Map.get(data, "cc", [])
57 bcc = Map.get(data, "bcc", [])
58 actor = Map.get(data, "actor", [])
59 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
63 defp get_recipients(data) do
64 to = Map.get(data, "to", [])
65 cc = Map.get(data, "cc", [])
66 bcc = Map.get(data, "bcc", [])
67 recipients = Enum.concat([to, cc, bcc])
71 defp check_actor_is_active(nil), do: true
73 defp check_actor_is_active(actor) when is_binary(actor) do
74 case User.get_cached_by_ap_id(actor) do
75 %User{deactivated: deactivated} -> not deactivated
80 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
81 limit = Config.get([:instance, :remote_limit])
82 String.length(content) <= limit
85 defp check_remote_limit(_), do: true
87 defp increase_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
91 def decrease_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
95 defp increase_replies_count_if_reply(%{
96 "object" => %{"inReplyTo" => reply_ap_id} = object,
99 if is_public?(object) do
100 Object.increase_replies_count(reply_ap_id)
104 defp increase_replies_count_if_reply(_create_data), do: :noop
106 defp increase_poll_votes_if_vote(%{
107 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
111 Object.increase_vote_count(reply_ap_id, name, actor)
114 defp increase_poll_votes_if_vote(_create_data), do: :noop
116 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
117 def persist(object, meta) do
118 with local <- Keyword.fetch!(meta, :local),
119 {recipients, _, _} <- get_recipients(object),
121 Repo.insert(%Activity{
124 recipients: recipients,
125 actor: object["actor"]
127 {:ok, activity, meta}
131 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
132 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
133 with nil <- Activity.normalize(map),
134 map <- lazy_put_activity_defaults(map, fake),
135 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
136 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
137 {:ok, map} <- MRF.filter(map),
138 {recipients, _, _} = get_recipients(map),
139 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
140 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
141 {:ok, map, object} <- insert_full_object(map) do
147 recipients: recipients
150 |> maybe_create_activity_expiration()
152 # Splice in the child object if we have one.
153 activity = Maps.put_if_present(activity, :object, object)
155 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
159 %Activity{} = activity ->
162 {:fake, true, map, recipients} ->
163 activity = %Activity{
167 recipients: recipients,
171 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
179 def notify_and_stream(activity) do
180 Notification.create_notifications(activity)
182 conversation = create_or_bump_conversation(activity, activity.actor)
183 participations = get_participations(conversation)
185 stream_out_participations(participations)
188 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
189 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
194 defp maybe_create_activity_expiration(result), do: result
196 defp create_or_bump_conversation(activity, actor) do
197 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
198 %User{} = user <- User.get_cached_by_ap_id(actor) do
199 Participation.mark_as_read(user, conversation)
204 defp get_participations({:ok, conversation}) do
206 |> Repo.preload(:participations, force: true)
207 |> Map.get(:participations)
210 defp get_participations(_), do: []
212 def stream_out_participations(participations) do
215 |> Repo.preload(:user)
217 Streamer.stream("participation", participations)
220 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
221 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
222 conversation = Repo.preload(conversation, :participations)
225 fetch_latest_activity_id_for_context(conversation.ap_id, %{
230 if last_activity_id do
231 stream_out_participations(conversation.participations)
236 def stream_out_participations(_, _), do: :noop
238 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
239 when data_type in ["Create", "Announce", "Delete"] do
241 |> Topics.get_activity_topics()
242 |> Streamer.stream(activity)
245 def stream_out(_activity) do
249 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
250 def create(params, fake \\ false) do
251 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
256 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
257 additional = params[:additional] || %{}
258 # only accept false as false value
259 local = !(params[:local] == false)
260 published = params[:published]
261 quick_insert? = Config.get([:env]) == :benchmark
265 %{to: to, actor: actor, published: published, context: context, object: object},
269 with {:ok, activity} <- insert(create_data, local, fake),
270 {:fake, false, activity} <- {:fake, fake, activity},
271 _ <- increase_replies_count_if_reply(create_data),
272 _ <- increase_poll_votes_if_vote(create_data),
273 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
274 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
275 _ <- notify_and_stream(activity),
276 :ok <- maybe_federate(activity) do
279 {:quick_insert, true, activity} ->
282 {:fake, true, activity} ->
286 Repo.rollback(message)
290 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
291 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
292 additional = params[:additional] || %{}
293 # only accept false as false value
294 local = !(params[:local] == false)
295 published = params[:published]
299 %{to: to, actor: actor, published: published, context: context, object: object},
303 with {:ok, activity} <- insert(listen_data, local),
304 _ <- notify_and_stream(activity),
305 :ok <- maybe_federate(activity) do
310 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
311 def accept(params) do
312 accept_or_reject("Accept", params)
315 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
316 def reject(params) do
317 accept_or_reject("Reject", params)
320 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
321 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
322 local = Map.get(params, :local, true)
323 activity_id = Map.get(params, :activity_id, nil)
326 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
327 |> Maps.put_if_present("id", activity_id)
329 with {:ok, activity} <- insert(data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
336 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
337 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
338 local = !(params[:local] == false)
339 activity_id = params[:activity_id]
349 |> Maps.put_if_present("id", activity_id)
351 with {:ok, activity} <- insert(data, local),
352 _ <- notify_and_stream(activity),
353 :ok <- maybe_federate(activity) do
358 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
359 {:ok, Activity.t()} | {:error, any()}
360 def follow(follower, followed, activity_id \\ nil, local \\ true) do
361 with {:ok, result} <-
362 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
367 defp do_follow(follower, followed, activity_id, local) do
368 data = make_follow_data(follower, followed, activity_id)
370 with {:ok, activity} <- insert(data, local),
371 _ <- notify_and_stream(activity),
372 :ok <- maybe_federate(activity) do
375 {:error, error} -> Repo.rollback(error)
379 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
380 {:ok, Activity.t()} | nil | {:error, any()}
381 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
382 with {:ok, result} <-
383 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
388 defp do_unfollow(follower, followed, activity_id, local) do
389 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
390 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
391 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
392 {:ok, activity} <- insert(unfollow_data, local),
393 _ <- notify_and_stream(activity),
394 :ok <- maybe_federate(activity) do
398 {:error, error} -> Repo.rollback(error)
402 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
403 {:ok, Activity.t()} | {:error, any()}
404 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
405 with {:ok, result} <-
406 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
411 defp do_block(blocker, blocked, activity_id, local) do
412 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
414 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
415 unfollow(blocker, blocked, nil, local)
418 block_data = make_block_data(blocker, blocked, activity_id)
420 with {:ok, activity} <- insert(block_data, local),
421 _ <- notify_and_stream(activity),
422 :ok <- maybe_federate(activity) do
425 {:error, error} -> Repo.rollback(error)
429 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
439 # only accept false as false value
440 local = !(params[:local] == false)
441 forward = !(params[:forward] == false)
443 additional = params[:additional] || %{}
447 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
449 Map.merge(additional, %{"to" => [], "cc" => []})
452 with flag_data <- make_flag_data(params, additional),
453 {:ok, activity} <- insert(flag_data, local),
454 {:ok, stripped_activity} <- strip_report_status_data(activity),
455 _ <- notify_and_stream(activity),
456 :ok <- maybe_federate(stripped_activity) do
457 User.all_superusers()
458 |> Enum.filter(fn user -> not is_nil(user.email) end)
459 |> Enum.each(fn superuser ->
461 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
462 |> Pleroma.Emails.Mailer.deliver_async()
469 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
470 def move(%User{} = origin, %User{} = target, local \\ true) do
473 "actor" => origin.ap_id,
474 "object" => origin.ap_id,
475 "target" => target.ap_id
478 with true <- origin.ap_id in target.also_known_as,
479 {:ok, activity} <- insert(params, local),
480 _ <- notify_and_stream(activity) do
481 maybe_federate(activity)
483 BackgroundWorker.enqueue("move_following", %{
484 "origin_id" => origin.id,
485 "target_id" => target.id
490 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
495 def fetch_activities_for_context_query(context, opts) do
496 public = [Constants.as_public()]
500 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
503 from(activity in Activity)
504 |> maybe_preload_objects(opts)
505 |> maybe_preload_bookmarks(opts)
506 |> maybe_set_thread_muted_field(opts)
507 |> restrict_blocked(opts)
508 |> restrict_recipients(recipients, opts[:user])
512 "?->>'type' = ? and ?->>'context' = ?",
519 |> exclude_poll_votes(opts)
521 |> order_by([activity], desc: activity.id)
524 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
525 def fetch_activities_for_context(context, opts \\ %{}) do
527 |> fetch_activities_for_context_query(opts)
531 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
532 FlakeId.Ecto.CompatType.t() | nil
533 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
535 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
541 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
543 opts = Map.delete(opts, :user)
545 [Constants.as_public()]
546 |> fetch_activities_query(opts)
547 |> restrict_unlisted(opts)
548 |> Pagination.fetch_paginated(opts, pagination)
551 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
552 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
554 |> Map.put(:restrict_unlisted, true)
555 |> fetch_public_or_unlisted_activities(pagination)
558 @valid_visibilities ~w[direct unlisted public private]
560 defp restrict_visibility(query, %{visibility: visibility})
561 when is_list(visibility) do
562 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
567 "activity_visibility(?, ?, ?) = ANY (?)",
575 Logger.error("Could not restrict visibility to #{visibility}")
579 defp restrict_visibility(query, %{visibility: visibility})
580 when visibility in @valid_visibilities do
584 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
588 defp restrict_visibility(_query, %{visibility: visibility})
589 when visibility not in @valid_visibilities do
590 Logger.error("Could not restrict visibility to #{visibility}")
593 defp restrict_visibility(query, _visibility), do: query
595 defp exclude_visibility(query, %{exclude_visibilities: visibility})
596 when is_list(visibility) do
597 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
602 "activity_visibility(?, ?, ?) = ANY (?)",
610 Logger.error("Could not exclude visibility to #{visibility}")
615 defp exclude_visibility(query, %{exclude_visibilities: visibility})
616 when visibility in @valid_visibilities do
621 "activity_visibility(?, ?, ?) = ?",
630 defp exclude_visibility(query, %{exclude_visibilities: visibility})
631 when visibility not in [nil | @valid_visibilities] do
632 Logger.error("Could not exclude visibility to #{visibility}")
636 defp exclude_visibility(query, _visibility), do: query
638 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
641 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
644 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
647 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
651 defp restrict_thread_visibility(query, _, _), do: query
653 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
656 |> Map.put(:user, reading_user)
657 |> Map.put(:actor_id, user.ap_id)
660 godmode: params[:godmode],
661 reading_user: reading_user
663 |> user_activities_recipients()
664 |> fetch_activities(params)
668 def fetch_user_activities(user, reading_user, params \\ %{}) do
671 |> Map.put(:type, ["Create", "Announce"])
672 |> Map.put(:user, reading_user)
673 |> Map.put(:actor_id, user.ap_id)
674 |> Map.put(:pinned_activity_ids, user.pinned_activities)
677 if User.blocks?(reading_user, user) do
681 |> Map.put(:blocking_user, reading_user)
682 |> Map.put(:muting_user, reading_user)
686 godmode: params[:godmode],
687 reading_user: reading_user
689 |> user_activities_recipients()
690 |> fetch_activities(params)
694 def fetch_statuses(reading_user, params) do
695 params = Map.put(params, :type, ["Create", "Announce"])
698 godmode: params[:godmode],
699 reading_user: reading_user
701 |> user_activities_recipients()
702 |> fetch_activities(params, :offset)
706 defp user_activities_recipients(%{godmode: true}), do: []
708 defp user_activities_recipients(%{reading_user: reading_user}) do
710 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
712 [Constants.as_public()]
716 defp restrict_since(query, %{since_id: ""}), do: query
718 defp restrict_since(query, %{since_id: since_id}) do
719 from(activity in query, where: activity.id > ^since_id)
722 defp restrict_since(query, _), do: query
724 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
725 raise "Can't use the child object without preloading!"
728 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
730 [_activity, object] in query,
731 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
735 defp restrict_tag_reject(query, _), do: query
737 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
738 raise "Can't use the child object without preloading!"
741 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
743 [_activity, object] in query,
744 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
748 defp restrict_tag_all(query, _), do: query
750 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
751 raise "Can't use the child object without preloading!"
754 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
756 [_activity, object] in query,
757 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
761 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
763 [_activity, object] in query,
764 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
768 defp restrict_tag(query, _), do: query
770 defp restrict_recipients(query, [], _user), do: query
772 defp restrict_recipients(query, recipients, nil) do
773 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
776 defp restrict_recipients(query, recipients, user) do
779 where: fragment("? && ?", ^recipients, activity.recipients),
780 or_where: activity.actor == ^user.ap_id
784 defp restrict_local(query, %{local_only: true}) do
785 from(activity in query, where: activity.local == true)
788 defp restrict_local(query, _), do: query
790 defp restrict_actor(query, %{actor_id: actor_id}) do
791 from(activity in query, where: activity.actor == ^actor_id)
794 defp restrict_actor(query, _), do: query
796 defp restrict_type(query, %{type: type}) when is_binary(type) do
797 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
800 defp restrict_type(query, %{type: type}) do
801 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
804 defp restrict_type(query, _), do: query
806 defp restrict_state(query, %{state: state}) do
807 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
810 defp restrict_state(query, _), do: query
812 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
814 [_activity, object] in query,
815 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
819 defp restrict_favorited_by(query, _), do: query
821 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
822 raise "Can't use the child object without preloading!"
825 defp restrict_media(query, %{only_media: true}) do
827 [_activity, object] in query,
828 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
832 defp restrict_media(query, _), do: query
834 defp restrict_replies(query, %{exclude_replies: true}) do
836 [_activity, object] in query,
837 where: fragment("?->>'inReplyTo' is null", object.data)
841 defp restrict_replies(query, %{
842 reply_filtering_user: user,
843 reply_visibility: "self"
846 [activity, object] in query,
849 "?->>'inReplyTo' is null OR ? = ANY(?)",
857 defp restrict_replies(query, %{
858 reply_filtering_user: user,
859 reply_visibility: "following"
862 [activity, object] in query,
865 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
867 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
876 defp restrict_replies(query, _), do: query
878 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
879 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
882 defp restrict_reblogs(query, _), do: query
884 defp restrict_muted(query, %{with_muted: true}), do: query
886 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
887 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
890 from([activity] in query,
891 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
892 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
895 unless opts[:skip_preload] do
896 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
902 defp restrict_muted(query, _), do: query
904 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
905 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
906 domain_blocks = user.domain_blocks || []
908 following_ap_ids = User.get_friends_ap_ids(user)
911 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
914 [activity, object: o] in query,
915 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
916 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
919 "recipients_contain_blocked_domains(?, ?) = false",
925 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
932 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
940 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
949 defp restrict_blocked(query, _), do: query
951 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
956 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
958 ^[Constants.as_public()]
963 defp restrict_unlisted(query, _), do: query
965 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
966 from(activity in query, where: activity.id in ^ids)
969 defp restrict_pinned(query, _), do: query
971 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
972 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
978 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
986 defp restrict_muted_reblogs(query, _), do: query
988 defp restrict_instance(query, %{instance: instance}) do
993 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
997 from(activity in query, where: activity.actor in ^users)
1000 defp restrict_instance(query, _), do: query
1002 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1004 defp exclude_poll_votes(query, _) do
1005 if has_named_binding?(query, :object) do
1006 from([activity, object: o] in query,
1007 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1014 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1016 defp exclude_invisible_actors(query, _opts) do
1018 User.Query.build(%{invisible: true, select: [:ap_id]})
1020 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1022 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1025 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1026 from(activity in query, where: activity.id != ^id)
1029 defp exclude_id(query, _), do: query
1031 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1033 defp maybe_preload_objects(query, _) do
1035 |> Activity.with_preloaded_object()
1038 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1040 defp maybe_preload_bookmarks(query, opts) do
1042 |> Activity.with_preloaded_bookmark(opts[:user])
1045 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1047 |> Activity.with_preloaded_report_notes()
1050 defp maybe_preload_report_notes(query, _), do: query
1052 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1054 defp maybe_set_thread_muted_field(query, opts) do
1056 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1059 defp maybe_order(query, %{order: :desc}) do
1061 |> order_by(desc: :id)
1064 defp maybe_order(query, %{order: :asc}) do
1066 |> order_by(asc: :id)
1069 defp maybe_order(query, _), do: query
1071 defp fetch_activities_query_ap_ids_ops(opts) do
1072 source_user = opts[:muting_user]
1073 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1075 ap_id_relationships =
1076 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1077 [:block | ap_id_relationships]
1082 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1084 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1085 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1087 restrict_muted_reblogs_opts =
1088 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1090 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1093 def fetch_activities_query(recipients, opts \\ %{}) do
1094 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1095 fetch_activities_query_ap_ids_ops(opts)
1098 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1102 |> maybe_preload_objects(opts)
1103 |> maybe_preload_bookmarks(opts)
1104 |> maybe_preload_report_notes(opts)
1105 |> maybe_set_thread_muted_field(opts)
1106 |> maybe_order(opts)
1107 |> restrict_recipients(recipients, opts[:user])
1108 |> restrict_replies(opts)
1109 |> restrict_tag(opts)
1110 |> restrict_tag_reject(opts)
1111 |> restrict_tag_all(opts)
1112 |> restrict_since(opts)
1113 |> restrict_local(opts)
1114 |> restrict_actor(opts)
1115 |> restrict_type(opts)
1116 |> restrict_state(opts)
1117 |> restrict_favorited_by(opts)
1118 |> restrict_blocked(restrict_blocked_opts)
1119 |> restrict_muted(restrict_muted_opts)
1120 |> restrict_media(opts)
1121 |> restrict_visibility(opts)
1122 |> restrict_thread_visibility(opts, config)
1123 |> restrict_reblogs(opts)
1124 |> restrict_pinned(opts)
1125 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1126 |> restrict_instance(opts)
1127 |> Activity.restrict_deactivated_users()
1128 |> exclude_poll_votes(opts)
1129 |> exclude_invisible_actors(opts)
1130 |> exclude_visibility(opts)
1133 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1134 list_memberships = Pleroma.List.memberships(opts[:user])
1136 fetch_activities_query(recipients ++ list_memberships, opts)
1137 |> Pagination.fetch_paginated(opts, pagination)
1139 |> maybe_update_cc(list_memberships, opts[:user])
1143 Fetch favorites activities of user with order by sort adds to favorites
1145 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1146 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1148 |> Activity.Queries.by_actor()
1149 |> Activity.Queries.by_type("Like")
1150 |> Activity.with_joined_object()
1151 |> Object.with_joined_activity()
1152 |> select([_like, object, activity], %{activity | object: object})
1153 |> order_by([like, _, _], desc_nulls_last: like.id)
1154 |> Pagination.fetch_paginated(
1155 Map.merge(params, %{skip_order: true}),
1161 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1162 Enum.map(activities, fn
1163 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1164 if Enum.any?(bcc, &(&1 in list_memberships)) do
1165 update_in(activity.data["cc"], &[user_ap_id | &1])
1175 defp maybe_update_cc(activities, _, _), do: activities
1177 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1178 from(activity in query,
1180 fragment("? && ?", activity.recipients, ^recipients) or
1181 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1182 ^Constants.as_public() in activity.recipients)
1186 def fetch_activities_bounded(
1188 recipients_with_public,
1190 pagination \\ :keyset
1192 fetch_activities_query([], opts)
1193 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1194 |> Pagination.fetch_paginated(opts, pagination)
1198 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1199 def upload(file, opts \\ []) do
1200 with {:ok, data} <- Upload.store(file, opts) do
1201 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1203 Repo.insert(%Object{data: obj_data})
1207 @spec get_actor_url(any()) :: binary() | nil
1208 defp get_actor_url(url) when is_binary(url), do: url
1209 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1211 defp get_actor_url(url) when is_list(url) do
1217 defp get_actor_url(_url), do: nil
1219 defp object_to_user_data(data) do
1221 data["icon"]["url"] &&
1224 "url" => [%{"href" => data["icon"]["url"]}]
1228 data["image"]["url"] &&
1231 "url" => [%{"href" => data["image"]["url"]}]
1236 |> Map.get("attachment", [])
1237 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1238 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1242 |> Map.get("tag", [])
1244 %{"type" => "Emoji"} -> true
1247 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1248 {String.trim(name, ":"), url}
1251 locked = data["manuallyApprovesFollowers"] || false
1252 data = Transmogrifier.maybe_fix_user_object(data)
1253 discoverable = data["discoverable"] || false
1254 invisible = data["invisible"] || false
1255 actor_type = data["type"] || "Person"
1258 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1259 data["publicKey"]["publicKeyPem"]
1265 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1266 data["endpoints"]["sharedInbox"]
1273 uri: get_actor_url(data["url"]),
1279 discoverable: discoverable,
1280 invisible: invisible,
1283 follower_address: data["followers"],
1284 following_address: data["following"],
1285 bio: data["summary"],
1286 actor_type: actor_type,
1287 also_known_as: Map.get(data, "alsoKnownAs", []),
1288 public_key: public_key,
1289 inbox: data["inbox"],
1290 shared_inbox: shared_inbox
1293 # nickname can be nil because of virtual actors
1294 if data["preferredUsername"] do
1298 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1301 Map.put(user_data, :nickname, nil)
1305 def fetch_follow_information_for_user(user) do
1306 with {:ok, following_data} <-
1307 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1308 {:ok, hide_follows} <- collection_private(following_data),
1309 {:ok, followers_data} <-
1310 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1311 {:ok, hide_followers} <- collection_private(followers_data) do
1314 hide_follows: hide_follows,
1315 follower_count: normalize_counter(followers_data["totalItems"]),
1316 following_count: normalize_counter(following_data["totalItems"]),
1317 hide_followers: hide_followers
1320 {:error, _} = e -> e
1325 defp normalize_counter(counter) when is_integer(counter), do: counter
1326 defp normalize_counter(_), do: 0
1328 def maybe_update_follow_information(user_data) do
1329 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1330 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1332 {:collections_available,
1333 !!(user_data[:following_address] && user_data[:follower_address])},
1335 fetch_follow_information_for_user(user_data) do
1336 info = Map.merge(user_data[:info] || %{}, info)
1339 |> Map.put(:info, info)
1341 {:user_type_check, false} ->
1344 {:collections_available, false} ->
1347 {:enabled, false} ->
1352 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1359 defp collection_private(%{"first" => %{"type" => type}})
1360 when type in ["CollectionPage", "OrderedCollectionPage"],
1363 defp collection_private(%{"first" => first}) do
1364 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1365 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1368 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1369 {:error, _} = e -> e
1374 defp collection_private(_data), do: {:ok, true}
1376 def user_data_from_user_object(data) do
1377 with {:ok, data} <- MRF.filter(data) do
1378 {:ok, object_to_user_data(data)}
1384 def fetch_and_prepare_user_from_ap_id(ap_id) do
1385 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1386 {:ok, data} <- user_data_from_user_object(data) do
1387 {:ok, maybe_update_follow_information(data)}
1389 {:error, "Object has been deleted" = e} ->
1390 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1394 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1399 def make_user_from_ap_id(ap_id) do
1400 user = User.get_cached_by_ap_id(ap_id)
1402 if user && !User.ap_enabled?(user) do
1403 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1405 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1408 |> User.remote_user_changeset(data)
1409 |> User.update_and_set_cache()
1412 |> User.remote_user_changeset()
1420 def make_user_from_nickname(nickname) do
1421 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1422 make_user_from_ap_id(ap_id)
1424 _e -> {:error, "No AP id in WebFinger"}
1428 # filter out broken threads
1429 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1430 entire_thread_visible_for_user?(activity, user)
1433 # do post-processing on a specific activity
1434 def contain_activity(%Activity{} = activity, %User{} = user) do
1435 contain_broken_threads(activity, user)
1438 def fetch_direct_messages_query do
1440 |> restrict_type(%{type: "Create"})
1441 |> restrict_visibility(%{visibility: "direct"})
1442 |> order_by([activity], asc: activity.id)