1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
92 Object.increase_vote_count(reply_ap_id, name, actor)
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
109 Repo.insert(%Activity{
112 recipients: recipients,
113 actor: object["actor"]
115 {:ok, activity, meta}
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
135 recipients: recipients
138 |> maybe_create_activity_expiration()
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
147 %Activity{} = activity ->
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
155 recipients: recipients,
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
173 stream_out_participations(participations)
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
182 defp maybe_create_activity_expiration(result), do: result
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
192 defp get_participations({:ok, conversation}) do
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
198 defp get_participations(_), do: []
200 def stream_out_participations(participations) do
203 |> Repo.preload(:user)
205 Streamer.stream("participation", participations)
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
224 def stream_out_participations(_, _), do: :noop
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
233 def stream_out(_activity) do
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
253 %{to: to, actor: actor, published: published, context: context, object: object},
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
267 {:quick_insert, true, activity} ->
270 {:fake, true, activity} ->
274 Repo.rollback(message)
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
287 %{to: to, actor: actor, published: published, context: context, object: object},
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
324 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
325 {:ok, Activity.t()} | {:error, any()}
326 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
327 with {:ok, result} <-
328 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
333 defp do_follow(follower, followed, activity_id, local, opts) do
334 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
335 data = make_follow_data(follower, followed, activity_id)
337 with {:ok, activity} <- insert(data, local),
338 _ <- skip_notify_and_stream || notify_and_stream(activity),
339 :ok <- maybe_federate(activity) do
342 {:error, error} -> Repo.rollback(error)
346 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
347 {:ok, Activity.t()} | nil | {:error, any()}
348 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
355 defp do_unfollow(follower, followed, activity_id, local) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
365 {:error, error} -> Repo.rollback(error)
369 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
379 # only accept false as false value
380 local = !(params[:local] == false)
381 forward = !(params[:forward] == false)
383 additional = params[:additional] || %{}
387 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
389 Map.merge(additional, %{"to" => [], "cc" => []})
392 with flag_data <- make_flag_data(params, additional),
393 {:ok, activity} <- insert(flag_data, local),
394 {:ok, stripped_activity} <- strip_report_status_data(activity),
395 _ <- notify_and_stream(activity),
396 :ok <- maybe_federate(stripped_activity) do
397 User.all_superusers()
398 |> Enum.filter(fn user -> not is_nil(user.email) end)
399 |> Enum.each(fn superuser ->
401 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
402 |> Pleroma.Emails.Mailer.deliver_async()
409 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
410 def move(%User{} = origin, %User{} = target, local \\ true) do
413 "actor" => origin.ap_id,
414 "object" => origin.ap_id,
415 "target" => target.ap_id
418 with true <- origin.ap_id in target.also_known_as,
419 {:ok, activity} <- insert(params, local),
420 _ <- notify_and_stream(activity) do
421 maybe_federate(activity)
423 BackgroundWorker.enqueue("move_following", %{
424 "origin_id" => origin.id,
425 "target_id" => target.id
430 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
435 def fetch_activities_for_context_query(context, opts) do
436 public = [Constants.as_public()]
440 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
443 from(activity in Activity)
444 |> maybe_preload_objects(opts)
445 |> maybe_preload_bookmarks(opts)
446 |> maybe_set_thread_muted_field(opts)
447 |> restrict_blocked(opts)
448 |> restrict_recipients(recipients, opts[:user])
452 "?->>'type' = ? and ?->>'context' = ?",
459 |> exclude_poll_votes(opts)
461 |> order_by([activity], desc: activity.id)
464 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
465 def fetch_activities_for_context(context, opts \\ %{}) do
467 |> fetch_activities_for_context_query(opts)
471 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
472 FlakeId.Ecto.CompatType.t() | nil
473 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
475 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
476 |> restrict_visibility(%{visibility: "direct"})
482 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
483 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
484 opts = Map.delete(opts, :user)
486 [Constants.as_public()]
487 |> fetch_activities_query(opts)
488 |> restrict_unlisted(opts)
489 |> Pagination.fetch_paginated(opts, pagination)
492 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
493 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
495 |> Map.put(:restrict_unlisted, true)
496 |> fetch_public_or_unlisted_activities(pagination)
499 @valid_visibilities ~w[direct unlisted public private]
501 defp restrict_visibility(query, %{visibility: visibility})
502 when is_list(visibility) do
503 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
508 "activity_visibility(?, ?, ?) = ANY (?)",
516 Logger.error("Could not restrict visibility to #{visibility}")
520 defp restrict_visibility(query, %{visibility: visibility})
521 when visibility in @valid_visibilities do
525 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
529 defp restrict_visibility(_query, %{visibility: visibility})
530 when visibility not in @valid_visibilities do
531 Logger.error("Could not restrict visibility to #{visibility}")
534 defp restrict_visibility(query, _visibility), do: query
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when is_list(visibility) do
538 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
543 "activity_visibility(?, ?, ?) = ANY (?)",
551 Logger.error("Could not exclude visibility to #{visibility}")
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility in @valid_visibilities do
562 "activity_visibility(?, ?, ?) = ?",
571 defp exclude_visibility(query, %{exclude_visibilities: visibility})
572 when visibility not in [nil | @valid_visibilities] do
573 Logger.error("Could not exclude visibility to #{visibility}")
577 defp exclude_visibility(query, _visibility), do: query
579 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
582 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
585 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
588 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
592 defp restrict_thread_visibility(query, _, _), do: query
594 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
597 |> Map.put(:user, reading_user)
598 |> Map.put(:actor_id, user.ap_id)
601 godmode: params[:godmode],
602 reading_user: reading_user
604 |> user_activities_recipients()
605 |> fetch_activities(params)
609 def fetch_user_activities(user, reading_user, params \\ %{}) do
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
618 if User.blocks?(reading_user, user) do
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
627 godmode: params[:godmode],
628 reading_user: reading_user
630 |> user_activities_recipients()
631 |> fetch_activities(params)
635 def fetch_statuses(reading_user, params) do
636 params = Map.put(params, :type, ["Create", "Announce"])
639 godmode: params[:godmode],
640 reading_user: reading_user
642 |> user_activities_recipients()
643 |> fetch_activities(params, :offset)
647 defp user_activities_recipients(%{godmode: true}), do: []
649 defp user_activities_recipients(%{reading_user: reading_user}) do
651 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
653 [Constants.as_public()]
657 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
658 raise "Can't use the child object without preloading!"
661 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
663 [activity, object] in query,
666 "?->>'type' != ? or ?->>'actor' != ?",
675 defp restrict_announce_object_actor(query, _), do: query
677 defp restrict_since(query, %{since_id: ""}), do: query
679 defp restrict_since(query, %{since_id: since_id}) do
680 from(activity in query, where: activity.id > ^since_id)
683 defp restrict_since(query, _), do: query
685 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
686 raise "Can't use the child object without preloading!"
689 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
691 [_activity, object] in query,
692 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
696 defp restrict_tag_reject(query, _), do: query
698 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
699 raise "Can't use the child object without preloading!"
702 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
709 defp restrict_tag_all(query, _), do: query
711 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
712 raise "Can't use the child object without preloading!"
715 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
717 [_activity, object] in query,
718 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
722 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
724 [_activity, object] in query,
725 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
729 defp restrict_tag(query, _), do: query
731 defp restrict_recipients(query, [], _user), do: query
733 defp restrict_recipients(query, recipients, nil) do
734 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
737 defp restrict_recipients(query, recipients, user) do
740 where: fragment("? && ?", ^recipients, activity.recipients),
741 or_where: activity.actor == ^user.ap_id
745 defp restrict_local(query, %{local_only: true}) do
746 from(activity in query, where: activity.local == true)
749 defp restrict_local(query, _), do: query
751 defp restrict_actor(query, %{actor_id: actor_id}) do
752 from(activity in query, where: activity.actor == ^actor_id)
755 defp restrict_actor(query, _), do: query
757 defp restrict_type(query, %{type: type}) when is_binary(type) do
758 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
761 defp restrict_type(query, %{type: type}) do
762 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
765 defp restrict_type(query, _), do: query
767 defp restrict_state(query, %{state: state}) do
768 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
771 defp restrict_state(query, _), do: query
773 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
775 [_activity, object] in query,
776 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
780 defp restrict_favorited_by(query, _), do: query
782 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
783 raise "Can't use the child object without preloading!"
786 defp restrict_media(query, %{only_media: true}) do
788 [activity, object] in query,
789 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
790 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
794 defp restrict_media(query, _), do: query
796 defp restrict_replies(query, %{exclude_replies: true}) do
798 [_activity, object] in query,
799 where: fragment("?->>'inReplyTo' is null", object.data)
803 defp restrict_replies(query, %{
804 reply_filtering_user: user,
805 reply_visibility: "self"
808 [activity, object] in query,
811 "?->>'inReplyTo' is null OR ? = ANY(?)",
819 defp restrict_replies(query, %{
820 reply_filtering_user: user,
821 reply_visibility: "following"
824 [activity, object] in query,
827 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
829 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
838 defp restrict_replies(query, _), do: query
840 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
841 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
844 defp restrict_reblogs(query, _), do: query
846 defp restrict_muted(query, %{with_muted: true}), do: query
848 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
849 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
852 from([activity] in query,
853 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
854 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
857 unless opts[:skip_preload] do
858 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
864 defp restrict_muted(query, _), do: query
866 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
867 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
868 domain_blocks = user.domain_blocks || []
870 following_ap_ids = User.get_friends_ap_ids(user)
873 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
876 [activity, object: o] in query,
877 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
878 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
881 "recipients_contain_blocked_domains(?, ?) = false",
887 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
894 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
902 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
911 defp restrict_blocked(query, _), do: query
913 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
918 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
920 ^[Constants.as_public()]
925 defp restrict_unlisted(query, _), do: query
927 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
928 from(activity in query, where: activity.id in ^ids)
931 defp restrict_pinned(query, _), do: query
933 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
934 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
940 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
948 defp restrict_muted_reblogs(query, _), do: query
950 defp restrict_instance(query, %{instance: instance}) do
955 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
959 from(activity in query, where: activity.actor in ^users)
962 defp restrict_instance(query, _), do: query
964 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
966 defp exclude_poll_votes(query, _) do
967 if has_named_binding?(query, :object) do
968 from([activity, object: o] in query,
969 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
976 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
978 defp exclude_chat_messages(query, _) do
979 if has_named_binding?(query, :object) do
980 from([activity, object: o] in query,
981 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
988 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
990 defp exclude_invisible_actors(query, _opts) do
992 User.Query.build(%{invisible: true, select: [:ap_id]})
994 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
996 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
999 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1000 from(activity in query, where: activity.id != ^id)
1003 defp exclude_id(query, _), do: query
1005 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1007 defp maybe_preload_objects(query, _) do
1009 |> Activity.with_preloaded_object()
1012 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1014 defp maybe_preload_bookmarks(query, opts) do
1016 |> Activity.with_preloaded_bookmark(opts[:user])
1019 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1021 |> Activity.with_preloaded_report_notes()
1024 defp maybe_preload_report_notes(query, _), do: query
1026 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1028 defp maybe_set_thread_muted_field(query, opts) do
1030 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1033 defp maybe_order(query, %{order: :desc}) do
1035 |> order_by(desc: :id)
1038 defp maybe_order(query, %{order: :asc}) do
1040 |> order_by(asc: :id)
1043 defp maybe_order(query, _), do: query
1045 defp fetch_activities_query_ap_ids_ops(opts) do
1046 source_user = opts[:muting_user]
1047 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1049 ap_id_relationships =
1050 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1051 [:block | ap_id_relationships]
1056 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1058 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1059 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1061 restrict_muted_reblogs_opts =
1062 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1064 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1067 def fetch_activities_query(recipients, opts \\ %{}) do
1068 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1069 fetch_activities_query_ap_ids_ops(opts)
1072 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1076 |> maybe_preload_objects(opts)
1077 |> maybe_preload_bookmarks(opts)
1078 |> maybe_preload_report_notes(opts)
1079 |> maybe_set_thread_muted_field(opts)
1080 |> maybe_order(opts)
1081 |> restrict_recipients(recipients, opts[:user])
1082 |> restrict_replies(opts)
1083 |> restrict_tag(opts)
1084 |> restrict_tag_reject(opts)
1085 |> restrict_tag_all(opts)
1086 |> restrict_since(opts)
1087 |> restrict_local(opts)
1088 |> restrict_actor(opts)
1089 |> restrict_type(opts)
1090 |> restrict_state(opts)
1091 |> restrict_favorited_by(opts)
1092 |> restrict_blocked(restrict_blocked_opts)
1093 |> restrict_muted(restrict_muted_opts)
1094 |> restrict_media(opts)
1095 |> restrict_visibility(opts)
1096 |> restrict_thread_visibility(opts, config)
1097 |> restrict_reblogs(opts)
1098 |> restrict_pinned(opts)
1099 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1100 |> restrict_instance(opts)
1101 |> restrict_announce_object_actor(opts)
1102 |> Activity.restrict_deactivated_users()
1103 |> exclude_poll_votes(opts)
1104 |> exclude_chat_messages(opts)
1105 |> exclude_invisible_actors(opts)
1106 |> exclude_visibility(opts)
1109 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1110 list_memberships = Pleroma.List.memberships(opts[:user])
1112 fetch_activities_query(recipients ++ list_memberships, opts)
1113 |> Pagination.fetch_paginated(opts, pagination)
1115 |> maybe_update_cc(list_memberships, opts[:user])
1119 Fetch favorites activities of user with order by sort adds to favorites
1121 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1122 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1124 |> Activity.Queries.by_actor()
1125 |> Activity.Queries.by_type("Like")
1126 |> Activity.with_joined_object()
1127 |> Object.with_joined_activity()
1128 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1129 |> order_by([like, _, _], desc_nulls_last: like.id)
1130 |> Pagination.fetch_paginated(
1131 Map.merge(params, %{skip_order: true}),
1136 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1137 Enum.map(activities, fn
1138 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1139 if Enum.any?(bcc, &(&1 in list_memberships)) do
1140 update_in(activity.data["cc"], &[user_ap_id | &1])
1150 defp maybe_update_cc(activities, _, _), do: activities
1152 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1153 from(activity in query,
1155 fragment("? && ?", activity.recipients, ^recipients) or
1156 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1157 ^Constants.as_public() in activity.recipients)
1161 def fetch_activities_bounded(
1163 recipients_with_public,
1165 pagination \\ :keyset
1167 fetch_activities_query([], opts)
1168 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1169 |> Pagination.fetch_paginated(opts, pagination)
1173 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1174 def upload(file, opts \\ []) do
1175 with {:ok, data} <- Upload.store(file, opts) do
1176 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1178 Repo.insert(%Object{data: obj_data})
1182 @spec get_actor_url(any()) :: binary() | nil
1183 defp get_actor_url(url) when is_binary(url), do: url
1184 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1186 defp get_actor_url(url) when is_list(url) do
1192 defp get_actor_url(_url), do: nil
1194 defp object_to_user_data(data) do
1196 data["icon"]["url"] &&
1199 "url" => [%{"href" => data["icon"]["url"]}]
1203 data["image"]["url"] &&
1206 "url" => [%{"href" => data["image"]["url"]}]
1211 |> Map.get("attachment", [])
1212 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1213 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1217 |> Map.get("tag", [])
1219 %{"type" => "Emoji"} -> true
1222 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1223 {String.trim(name, ":"), url}
1226 locked = data["manuallyApprovesFollowers"] || false
1227 accepts_chat_messages = data["acceptsChatMessages"]
1228 data = Transmogrifier.maybe_fix_user_object(data)
1229 discoverable = data["discoverable"] || false
1230 invisible = data["invisible"] || false
1231 actor_type = data["type"] || "Person"
1234 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1235 data["publicKey"]["publicKeyPem"]
1241 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1242 data["endpoints"]["sharedInbox"]
1249 uri: get_actor_url(data["url"]),
1255 discoverable: discoverable,
1256 invisible: invisible,
1259 follower_address: data["followers"],
1260 following_address: data["following"],
1261 bio: data["summary"],
1262 actor_type: actor_type,
1263 also_known_as: Map.get(data, "alsoKnownAs", []),
1264 public_key: public_key,
1265 inbox: data["inbox"],
1266 shared_inbox: shared_inbox,
1267 accepts_chat_messages: accepts_chat_messages
1270 # nickname can be nil because of virtual actors
1271 if data["preferredUsername"] do
1275 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1278 Map.put(user_data, :nickname, nil)
1282 def fetch_follow_information_for_user(user) do
1283 with {:ok, following_data} <-
1284 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1285 {:ok, hide_follows} <- collection_private(following_data),
1286 {:ok, followers_data} <-
1287 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1288 {:ok, hide_followers} <- collection_private(followers_data) do
1291 hide_follows: hide_follows,
1292 follower_count: normalize_counter(followers_data["totalItems"]),
1293 following_count: normalize_counter(following_data["totalItems"]),
1294 hide_followers: hide_followers
1297 {:error, _} = e -> e
1302 defp normalize_counter(counter) when is_integer(counter), do: counter
1303 defp normalize_counter(_), do: 0
1305 def maybe_update_follow_information(user_data) do
1306 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1307 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1309 {:collections_available,
1310 !!(user_data[:following_address] && user_data[:follower_address])},
1312 fetch_follow_information_for_user(user_data) do
1313 info = Map.merge(user_data[:info] || %{}, info)
1316 |> Map.put(:info, info)
1318 {:user_type_check, false} ->
1321 {:collections_available, false} ->
1324 {:enabled, false} ->
1329 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1336 defp collection_private(%{"first" => %{"type" => type}})
1337 when type in ["CollectionPage", "OrderedCollectionPage"],
1340 defp collection_private(%{"first" => first}) do
1341 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1342 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1345 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1346 {:error, _} = e -> e
1351 defp collection_private(_data), do: {:ok, true}
1353 def user_data_from_user_object(data) do
1354 with {:ok, data} <- MRF.filter(data) do
1355 {:ok, object_to_user_data(data)}
1361 def fetch_and_prepare_user_from_ap_id(ap_id) do
1362 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1363 {:ok, data} <- user_data_from_user_object(data) do
1364 {:ok, maybe_update_follow_information(data)}
1366 {:error, "Object has been deleted" = e} ->
1367 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1371 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1376 def maybe_handle_clashing_nickname(nickname) do
1377 with %User{} = old_user <- User.get_by_nickname(nickname) do
1378 Logger.info("Found an old user for #{nickname}, ap id is #{old_user.ap_id}, renaming.")
1381 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1382 |> User.update_and_set_cache()
1386 def make_user_from_ap_id(ap_id) do
1387 user = User.get_cached_by_ap_id(ap_id)
1389 if user && !User.ap_enabled?(user) do
1390 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1392 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1395 |> User.remote_user_changeset(data)
1396 |> User.update_and_set_cache()
1398 maybe_handle_clashing_nickname(data[:nickname])
1401 |> User.remote_user_changeset()
1409 def make_user_from_nickname(nickname) do
1410 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1411 make_user_from_ap_id(ap_id)
1413 _e -> {:error, "No AP id in WebFinger"}
1417 # filter out broken threads
1418 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1419 entire_thread_visible_for_user?(activity, user)
1422 # do post-processing on a specific activity
1423 def contain_activity(%Activity{} = activity, %User{} = user) do
1424 contain_broken_threads(activity, user)
1427 def fetch_direct_messages_query do
1429 |> restrict_type(%{type: "Create"})
1430 |> restrict_visibility(%{visibility: "direct"})
1431 |> order_by([activity], asc: activity.id)