1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
32 require Pleroma.Constants
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
46 user -> User.following?(user, actor)
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
70 defp check_actor_is_active(nil), do: true
72 defp check_actor_is_active(actor) when is_binary(actor) do
73 case User.get_cached_by_ap_id(actor) do
74 %User{deactivated: deactivated} -> not deactivated
79 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
80 limit = Config.get([:instance, :remote_limit])
81 String.length(content) <= limit
84 defp check_remote_limit(_), do: true
86 defp increase_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
90 def decrease_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
94 defp increase_replies_count_if_reply(%{
95 "object" => %{"inReplyTo" => reply_ap_id} = object,
98 if is_public?(object) do
99 Object.increase_replies_count(reply_ap_id)
103 defp increase_replies_count_if_reply(_create_data), do: :noop
105 defp increase_poll_votes_if_vote(%{
106 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
110 Object.increase_vote_count(reply_ap_id, name, actor)
113 defp increase_poll_votes_if_vote(_create_data), do: :noop
115 @object_types ["ChatMessage"]
116 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
117 def persist(%{"type" => type} = object, meta) when type in @object_types do
118 with {:ok, object} <- Object.create(object) do
123 def persist(object, meta) do
124 with local <- Keyword.fetch!(meta, :local),
125 {recipients, _, _} <- get_recipients(object),
127 Repo.insert(%Activity{
130 recipients: recipients,
131 actor: object["actor"]
133 {:ok, activity, meta}
137 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
138 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
139 with nil <- Activity.normalize(map),
140 map <- lazy_put_activity_defaults(map, fake),
141 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
142 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
143 {:ok, map} <- MRF.filter(map),
144 {recipients, _, _} = get_recipients(map),
145 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
146 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
147 {:ok, map, object} <- insert_full_object(map) do
149 Repo.insert(%Activity{
153 recipients: recipients
156 # Splice in the child object if we have one.
157 activity = Maps.put_if_present(activity, :object, object)
159 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
163 %Activity{} = activity ->
166 {:fake, true, map, recipients} ->
167 activity = %Activity{
171 recipients: recipients,
175 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
183 def notify_and_stream(activity) do
184 Notification.create_notifications(activity)
186 conversation = create_or_bump_conversation(activity, activity.actor)
187 participations = get_participations(conversation)
189 stream_out_participations(participations)
192 defp create_or_bump_conversation(activity, actor) do
193 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
194 %User{} = user <- User.get_cached_by_ap_id(actor) do
195 Participation.mark_as_read(user, conversation)
200 defp get_participations({:ok, conversation}) do
202 |> Repo.preload(:participations, force: true)
203 |> Map.get(:participations)
206 defp get_participations(_), do: []
208 def stream_out_participations(participations) do
211 |> Repo.preload(:user)
213 Streamer.stream("participation", participations)
216 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
217 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
218 conversation = Repo.preload(conversation, :participations)
221 fetch_latest_activity_id_for_context(conversation.ap_id, %{
226 if last_activity_id do
227 stream_out_participations(conversation.participations)
232 def stream_out_participations(_, _), do: :noop
234 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
235 when data_type in ["Create", "Announce", "Delete"] do
237 |> Topics.get_activity_topics()
238 |> Streamer.stream(activity)
241 def stream_out(_activity) do
245 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
246 def create(params, fake \\ false) do
247 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
252 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257 quick_insert? = Config.get([:env]) == :benchmark
261 %{to: to, actor: actor, published: published, context: context, object: object},
265 with {:ok, activity} <- insert(create_data, local, fake),
266 {:fake, false, activity} <- {:fake, fake, activity},
267 _ <- increase_replies_count_if_reply(create_data),
268 _ <- increase_poll_votes_if_vote(create_data),
269 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
270 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
271 _ <- notify_and_stream(activity),
272 :ok <- maybe_federate(activity) do
275 {:quick_insert, true, activity} ->
278 {:fake, true, activity} ->
282 Repo.rollback(message)
286 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
287 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
288 additional = params[:additional] || %{}
289 # only accept false as false value
290 local = !(params[:local] == false)
291 published = params[:published]
295 %{to: to, actor: actor, published: published, context: context, object: object},
299 with {:ok, activity} <- insert(listen_data, local),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_federate(activity) do
306 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
307 def accept(params) do
308 accept_or_reject("Accept", params)
311 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
312 def reject(params) do
313 accept_or_reject("Reject", params)
316 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
317 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
318 local = Map.get(params, :local, true)
319 activity_id = Map.get(params, :activity_id, nil)
322 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
323 |> Maps.put_if_present("id", activity_id)
325 with {:ok, activity} <- insert(data, local),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_federate(activity) do
332 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
333 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
334 local = !(params[:local] == false)
335 activity_id = params[:activity_id]
345 |> Maps.put_if_present("id", activity_id)
347 with {:ok, activity} <- insert(data, local),
348 _ <- notify_and_stream(activity),
349 :ok <- maybe_federate(activity) do
354 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
355 {:ok, Activity.t()} | {:error, any()}
356 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
357 with {:ok, result} <-
358 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
363 defp do_follow(follower, followed, activity_id, local, opts) do
364 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
365 data = make_follow_data(follower, followed, activity_id)
367 with {:ok, activity} <- insert(data, local),
368 _ <- skip_notify_and_stream || notify_and_stream(activity),
369 :ok <- maybe_federate(activity) do
372 {:error, error} -> Repo.rollback(error)
376 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
377 {:ok, Activity.t()} | nil | {:error, any()}
378 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
379 with {:ok, result} <-
380 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
385 defp do_unfollow(follower, followed, activity_id, local) do
386 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
387 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
388 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
389 {:ok, activity} <- insert(unfollow_data, local),
390 _ <- notify_and_stream(activity),
391 :ok <- maybe_federate(activity) do
395 {:error, error} -> Repo.rollback(error)
399 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
400 {:ok, Activity.t()} | {:error, any()}
401 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
402 with {:ok, result} <-
403 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
408 defp do_block(blocker, blocked, activity_id, local) do
409 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
411 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
412 unfollow(blocker, blocked, nil, local)
415 block_data = make_block_data(blocker, blocked, activity_id)
417 with {:ok, activity} <- insert(block_data, local),
418 _ <- notify_and_stream(activity),
419 :ok <- maybe_federate(activity) do
422 {:error, error} -> Repo.rollback(error)
426 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
436 # only accept false as false value
437 local = !(params[:local] == false)
438 forward = !(params[:forward] == false)
440 additional = params[:additional] || %{}
444 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
446 Map.merge(additional, %{"to" => [], "cc" => []})
449 with flag_data <- make_flag_data(params, additional),
450 {:ok, activity} <- insert(flag_data, local),
451 {:ok, stripped_activity} <- strip_report_status_data(activity),
452 _ <- notify_and_stream(activity),
453 :ok <- maybe_federate(stripped_activity) do
454 User.all_superusers()
455 |> Enum.filter(fn user -> not is_nil(user.email) end)
456 |> Enum.each(fn superuser ->
458 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
459 |> Pleroma.Emails.Mailer.deliver_async()
466 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
467 def move(%User{} = origin, %User{} = target, local \\ true) do
470 "actor" => origin.ap_id,
471 "object" => origin.ap_id,
472 "target" => target.ap_id
475 with true <- origin.ap_id in target.also_known_as,
476 {:ok, activity} <- insert(params, local),
477 _ <- notify_and_stream(activity) do
478 maybe_federate(activity)
480 BackgroundWorker.enqueue("move_following", %{
481 "origin_id" => origin.id,
482 "target_id" => target.id
487 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
492 def fetch_activities_for_context_query(context, opts) do
493 public = [Constants.as_public()]
497 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
500 from(activity in Activity)
501 |> maybe_preload_objects(opts)
502 |> maybe_preload_bookmarks(opts)
503 |> maybe_set_thread_muted_field(opts)
504 |> restrict_blocked(opts)
505 |> restrict_recipients(recipients, opts[:user])
509 "?->>'type' = ? and ?->>'context' = ?",
516 |> exclude_poll_votes(opts)
518 |> order_by([activity], desc: activity.id)
521 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
522 def fetch_activities_for_context(context, opts \\ %{}) do
524 |> fetch_activities_for_context_query(opts)
528 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
529 FlakeId.Ecto.CompatType.t() | nil
530 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
532 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
538 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
539 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
540 opts = Map.delete(opts, :user)
542 [Constants.as_public()]
543 |> fetch_activities_query(opts)
544 |> restrict_unlisted(opts)
545 |> Pagination.fetch_paginated(opts, pagination)
548 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
549 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
551 |> Map.put(:restrict_unlisted, true)
552 |> fetch_public_or_unlisted_activities(pagination)
555 @valid_visibilities ~w[direct unlisted public private]
557 defp restrict_visibility(query, %{visibility: visibility})
558 when is_list(visibility) do
559 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
564 "activity_visibility(?, ?, ?) = ANY (?)",
572 Logger.error("Could not restrict visibility to #{visibility}")
576 defp restrict_visibility(query, %{visibility: visibility})
577 when visibility in @valid_visibilities do
581 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
585 defp restrict_visibility(_query, %{visibility: visibility})
586 when visibility not in @valid_visibilities do
587 Logger.error("Could not restrict visibility to #{visibility}")
590 defp restrict_visibility(query, _visibility), do: query
592 defp exclude_visibility(query, %{exclude_visibilities: visibility})
593 when is_list(visibility) do
594 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
599 "activity_visibility(?, ?, ?) = ANY (?)",
607 Logger.error("Could not exclude visibility to #{visibility}")
612 defp exclude_visibility(query, %{exclude_visibilities: visibility})
613 when visibility in @valid_visibilities do
618 "activity_visibility(?, ?, ?) = ?",
627 defp exclude_visibility(query, %{exclude_visibilities: visibility})
628 when visibility not in [nil | @valid_visibilities] do
629 Logger.error("Could not exclude visibility to #{visibility}")
633 defp exclude_visibility(query, _visibility), do: query
635 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
638 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
641 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
644 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
648 defp restrict_thread_visibility(query, _, _), do: query
650 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
653 |> Map.put(:user, reading_user)
654 |> Map.put(:actor_id, user.ap_id)
657 godmode: params[:godmode],
658 reading_user: reading_user
660 |> user_activities_recipients()
661 |> fetch_activities(params)
665 def fetch_user_activities(user, reading_user, params \\ %{}) do
668 |> Map.put(:type, ["Create", "Announce"])
669 |> Map.put(:user, reading_user)
670 |> Map.put(:actor_id, user.ap_id)
671 |> Map.put(:pinned_activity_ids, user.pinned_activities)
674 if User.blocks?(reading_user, user) do
678 |> Map.put(:blocking_user, reading_user)
679 |> Map.put(:muting_user, reading_user)
683 godmode: params[:godmode],
684 reading_user: reading_user
686 |> user_activities_recipients()
687 |> fetch_activities(params)
691 def fetch_statuses(reading_user, params) do
692 params = Map.put(params, :type, ["Create", "Announce"])
695 godmode: params[:godmode],
696 reading_user: reading_user
698 |> user_activities_recipients()
699 |> fetch_activities(params, :offset)
703 defp user_activities_recipients(%{godmode: true}), do: []
705 defp user_activities_recipients(%{reading_user: reading_user}) do
707 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
709 [Constants.as_public()]
713 defp restrict_since(query, %{since_id: ""}), do: query
715 defp restrict_since(query, %{since_id: since_id}) do
716 from(activity in query, where: activity.id > ^since_id)
719 defp restrict_since(query, _), do: query
721 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
722 raise "Can't use the child object without preloading!"
725 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
727 [_activity, object] in query,
728 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
732 defp restrict_tag_reject(query, _), do: query
734 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
735 raise "Can't use the child object without preloading!"
738 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
740 [_activity, object] in query,
741 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
745 defp restrict_tag_all(query, _), do: query
747 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
748 raise "Can't use the child object without preloading!"
751 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
758 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
760 [_activity, object] in query,
761 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
765 defp restrict_tag(query, _), do: query
767 defp restrict_recipients(query, [], _user), do: query
769 defp restrict_recipients(query, recipients, nil) do
770 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
773 defp restrict_recipients(query, recipients, user) do
776 where: fragment("? && ?", ^recipients, activity.recipients),
777 or_where: activity.actor == ^user.ap_id
781 defp restrict_local(query, %{local_only: true}) do
782 from(activity in query, where: activity.local == true)
785 defp restrict_local(query, _), do: query
787 defp restrict_actor(query, %{actor_id: actor_id}) do
788 from(activity in query, where: activity.actor == ^actor_id)
791 defp restrict_actor(query, _), do: query
793 defp restrict_type(query, %{type: type}) when is_binary(type) do
794 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
797 defp restrict_type(query, %{type: type}) do
798 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
801 defp restrict_type(query, _), do: query
803 defp restrict_state(query, %{state: state}) do
804 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
807 defp restrict_state(query, _), do: query
809 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
811 [_activity, object] in query,
812 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
816 defp restrict_favorited_by(query, _), do: query
818 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
819 raise "Can't use the child object without preloading!"
822 defp restrict_media(query, %{only_media: true}) do
824 [_activity, object] in query,
825 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
829 defp restrict_media(query, _), do: query
831 defp restrict_replies(query, %{exclude_replies: true}) do
833 [_activity, object] in query,
834 where: fragment("?->>'inReplyTo' is null", object.data)
838 defp restrict_replies(query, %{
839 reply_filtering_user: user,
840 reply_visibility: "self"
843 [activity, object] in query,
846 "?->>'inReplyTo' is null OR ? = ANY(?)",
854 defp restrict_replies(query, %{
855 reply_filtering_user: user,
856 reply_visibility: "following"
859 [activity, object] in query,
862 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
864 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
873 defp restrict_replies(query, _), do: query
875 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
876 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
879 defp restrict_reblogs(query, _), do: query
881 defp restrict_muted(query, %{with_muted: true}), do: query
883 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
884 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
887 from([activity] in query,
888 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
889 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
892 unless opts[:skip_preload] do
893 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
899 defp restrict_muted(query, _), do: query
901 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
902 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
903 domain_blocks = user.domain_blocks || []
905 following_ap_ids = User.get_friends_ap_ids(user)
908 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
911 [activity, object: o] in query,
912 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
913 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
916 "recipients_contain_blocked_domains(?, ?) = false",
922 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
929 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
937 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
946 defp restrict_blocked(query, _), do: query
948 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
953 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
955 ^[Constants.as_public()]
960 defp restrict_unlisted(query, _), do: query
962 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
963 from(activity in query, where: activity.id in ^ids)
966 defp restrict_pinned(query, _), do: query
968 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
969 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
975 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
983 defp restrict_muted_reblogs(query, _), do: query
985 defp restrict_instance(query, %{instance: instance}) do
990 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
994 from(activity in query, where: activity.actor in ^users)
997 defp restrict_instance(query, _), do: query
999 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1001 defp exclude_poll_votes(query, _) do
1002 if has_named_binding?(query, :object) do
1003 from([activity, object: o] in query,
1004 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1011 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1013 defp exclude_chat_messages(query, _) do
1014 if has_named_binding?(query, :object) do
1015 from([activity, object: o] in query,
1016 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1023 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1025 defp exclude_invisible_actors(query, _opts) do
1027 User.Query.build(%{invisible: true, select: [:ap_id]})
1029 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1031 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1034 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1035 from(activity in query, where: activity.id != ^id)
1038 defp exclude_id(query, _), do: query
1040 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1042 defp maybe_preload_objects(query, _) do
1044 |> Activity.with_preloaded_object()
1047 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1049 defp maybe_preload_bookmarks(query, opts) do
1051 |> Activity.with_preloaded_bookmark(opts[:user])
1054 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1056 |> Activity.with_preloaded_report_notes()
1059 defp maybe_preload_report_notes(query, _), do: query
1061 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1063 defp maybe_set_thread_muted_field(query, opts) do
1065 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1068 defp maybe_order(query, %{order: :desc}) do
1070 |> order_by(desc: :id)
1073 defp maybe_order(query, %{order: :asc}) do
1075 |> order_by(asc: :id)
1078 defp maybe_order(query, _), do: query
1080 defp fetch_activities_query_ap_ids_ops(opts) do
1081 source_user = opts[:muting_user]
1082 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1084 ap_id_relationships =
1085 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1086 [:block | ap_id_relationships]
1091 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1093 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1094 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1096 restrict_muted_reblogs_opts =
1097 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1099 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1102 def fetch_activities_query(recipients, opts \\ %{}) do
1103 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1104 fetch_activities_query_ap_ids_ops(opts)
1107 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1111 |> maybe_preload_objects(opts)
1112 |> maybe_preload_bookmarks(opts)
1113 |> maybe_preload_report_notes(opts)
1114 |> maybe_set_thread_muted_field(opts)
1115 |> maybe_order(opts)
1116 |> restrict_recipients(recipients, opts[:user])
1117 |> restrict_replies(opts)
1118 |> restrict_tag(opts)
1119 |> restrict_tag_reject(opts)
1120 |> restrict_tag_all(opts)
1121 |> restrict_since(opts)
1122 |> restrict_local(opts)
1123 |> restrict_actor(opts)
1124 |> restrict_type(opts)
1125 |> restrict_state(opts)
1126 |> restrict_favorited_by(opts)
1127 |> restrict_blocked(restrict_blocked_opts)
1128 |> restrict_muted(restrict_muted_opts)
1129 |> restrict_media(opts)
1130 |> restrict_visibility(opts)
1131 |> restrict_thread_visibility(opts, config)
1132 |> restrict_reblogs(opts)
1133 |> restrict_pinned(opts)
1134 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1135 |> restrict_instance(opts)
1136 |> Activity.restrict_deactivated_users()
1137 |> exclude_poll_votes(opts)
1138 |> exclude_chat_messages(opts)
1139 |> exclude_invisible_actors(opts)
1140 |> exclude_visibility(opts)
1143 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1144 list_memberships = Pleroma.List.memberships(opts[:user])
1146 fetch_activities_query(recipients ++ list_memberships, opts)
1147 |> Pagination.fetch_paginated(opts, pagination)
1149 |> maybe_update_cc(list_memberships, opts[:user])
1153 Fetch favorites activities of user with order by sort adds to favorites
1155 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1156 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1158 |> Activity.Queries.by_actor()
1159 |> Activity.Queries.by_type("Like")
1160 |> Activity.with_joined_object()
1161 |> Object.with_joined_activity()
1162 |> select([_like, object, activity], %{activity | object: object})
1163 |> order_by([like, _, _], desc_nulls_last: like.id)
1164 |> Pagination.fetch_paginated(
1165 Map.merge(params, %{skip_order: true}),
1171 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1172 Enum.map(activities, fn
1173 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1174 if Enum.any?(bcc, &(&1 in list_memberships)) do
1175 update_in(activity.data["cc"], &[user_ap_id | &1])
1185 defp maybe_update_cc(activities, _, _), do: activities
1187 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1188 from(activity in query,
1190 fragment("? && ?", activity.recipients, ^recipients) or
1191 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1192 ^Constants.as_public() in activity.recipients)
1196 def fetch_activities_bounded(
1198 recipients_with_public,
1200 pagination \\ :keyset
1202 fetch_activities_query([], opts)
1203 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1204 |> Pagination.fetch_paginated(opts, pagination)
1208 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1209 def upload(file, opts \\ []) do
1210 with {:ok, data} <- Upload.store(file, opts) do
1211 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1213 Repo.insert(%Object{data: obj_data})
1217 @spec get_actor_url(any()) :: binary() | nil
1218 defp get_actor_url(url) when is_binary(url), do: url
1219 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1221 defp get_actor_url(url) when is_list(url) do
1227 defp get_actor_url(_url), do: nil
1229 defp object_to_user_data(data) do
1231 data["icon"]["url"] &&
1234 "url" => [%{"href" => data["icon"]["url"]}]
1238 data["image"]["url"] &&
1241 "url" => [%{"href" => data["image"]["url"]}]
1246 |> Map.get("attachment", [])
1247 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1248 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1252 |> Map.get("tag", [])
1254 %{"type" => "Emoji"} -> true
1257 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1258 {String.trim(name, ":"), url}
1261 locked = data["manuallyApprovesFollowers"] || false
1262 data = Transmogrifier.maybe_fix_user_object(data)
1263 discoverable = data["discoverable"] || false
1264 invisible = data["invisible"] || false
1265 actor_type = data["type"] || "Person"
1268 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1269 data["publicKey"]["publicKeyPem"]
1275 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1276 data["endpoints"]["sharedInbox"]
1283 uri: get_actor_url(data["url"]),
1289 discoverable: discoverable,
1290 invisible: invisible,
1293 follower_address: data["followers"],
1294 following_address: data["following"],
1295 bio: data["summary"],
1296 actor_type: actor_type,
1297 also_known_as: Map.get(data, "alsoKnownAs", []),
1298 public_key: public_key,
1299 inbox: data["inbox"],
1300 shared_inbox: shared_inbox
1303 # nickname can be nil because of virtual actors
1304 if data["preferredUsername"] do
1308 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1311 Map.put(user_data, :nickname, nil)
1315 def fetch_follow_information_for_user(user) do
1316 with {:ok, following_data} <-
1317 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1318 {:ok, hide_follows} <- collection_private(following_data),
1319 {:ok, followers_data} <-
1320 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1321 {:ok, hide_followers} <- collection_private(followers_data) do
1324 hide_follows: hide_follows,
1325 follower_count: normalize_counter(followers_data["totalItems"]),
1326 following_count: normalize_counter(following_data["totalItems"]),
1327 hide_followers: hide_followers
1330 {:error, _} = e -> e
1335 defp normalize_counter(counter) when is_integer(counter), do: counter
1336 defp normalize_counter(_), do: 0
1338 def maybe_update_follow_information(user_data) do
1339 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1340 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1342 {:collections_available,
1343 !!(user_data[:following_address] && user_data[:follower_address])},
1345 fetch_follow_information_for_user(user_data) do
1346 info = Map.merge(user_data[:info] || %{}, info)
1349 |> Map.put(:info, info)
1351 {:user_type_check, false} ->
1354 {:collections_available, false} ->
1357 {:enabled, false} ->
1362 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1369 defp collection_private(%{"first" => %{"type" => type}})
1370 when type in ["CollectionPage", "OrderedCollectionPage"],
1373 defp collection_private(%{"first" => first}) do
1374 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1375 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1378 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1379 {:error, _} = e -> e
1384 defp collection_private(_data), do: {:ok, true}
1386 def user_data_from_user_object(data) do
1387 with {:ok, data} <- MRF.filter(data) do
1388 {:ok, object_to_user_data(data)}
1394 def fetch_and_prepare_user_from_ap_id(ap_id) do
1395 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1396 {:ok, data} <- user_data_from_user_object(data) do
1397 {:ok, maybe_update_follow_information(data)}
1399 {:error, "Object has been deleted" = e} ->
1400 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1404 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1409 def make_user_from_ap_id(ap_id) do
1410 user = User.get_cached_by_ap_id(ap_id)
1412 if user && !User.ap_enabled?(user) do
1413 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1415 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1418 |> User.remote_user_changeset(data)
1419 |> User.update_and_set_cache()
1422 |> User.remote_user_changeset()
1430 def make_user_from_nickname(nickname) do
1431 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1432 make_user_from_ap_id(ap_id)
1434 _e -> {:error, "No AP id in WebFinger"}
1438 # filter out broken threads
1439 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1440 entire_thread_visible_for_user?(activity, user)
1443 # do post-processing on a specific activity
1444 def contain_activity(%Activity{} = activity, %User{} = user) do
1445 contain_broken_threads(activity, user)
1448 def fetch_direct_messages_query do
1450 |> restrict_type(%{type: "Create"})
1451 |> restrict_visibility(%{visibility: "direct"})
1452 |> order_by([activity], asc: activity.id)