1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
92 Object.increase_vote_count(reply_ap_id, name, actor)
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
109 Repo.insert(%Activity{
112 recipients: recipients,
113 actor: object["actor"]
115 {:ok, activity, meta}
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
135 recipients: recipients
138 |> maybe_create_activity_expiration()
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
147 %Activity{} = activity ->
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
155 recipients: recipients,
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
173 stream_out_participations(participations)
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
182 defp maybe_create_activity_expiration(result), do: result
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
192 defp get_participations({:ok, conversation}) do
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
198 defp get_participations(_), do: []
200 def stream_out_participations(participations) do
203 |> Repo.preload(:user)
205 Streamer.stream("participation", participations)
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
224 def stream_out_participations(_, _), do: :noop
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
233 def stream_out(_activity) do
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
253 %{to: to, actor: actor, published: published, context: context, object: object},
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
267 {:quick_insert, true, activity} ->
270 {:fake, true, activity} ->
274 Repo.rollback(message)
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
287 %{to: to, actor: actor, published: published, context: context, object: object},
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
324 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
325 {:ok, Activity.t()} | {:error, any()}
326 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
327 with {:ok, result} <-
328 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
333 defp do_follow(follower, followed, activity_id, local, opts) do
334 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
335 data = make_follow_data(follower, followed, activity_id)
337 with {:ok, activity} <- insert(data, local),
338 _ <- skip_notify_and_stream || notify_and_stream(activity),
339 :ok <- maybe_federate(activity) do
342 {:error, error} -> Repo.rollback(error)
346 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
347 {:ok, Activity.t()} | nil | {:error, any()}
348 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
355 defp do_unfollow(follower, followed, activity_id, local) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
365 {:error, error} -> Repo.rollback(error)
369 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
370 {:ok, Activity.t()} | {:error, any()}
371 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
372 with {:ok, result} <-
373 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
378 defp do_block(blocker, blocked, activity_id, local) do
379 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
381 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
382 unfollow(blocker, blocked, nil, local)
385 block_data = make_block_data(blocker, blocked, activity_id)
387 with {:ok, activity} <- insert(block_data, local),
388 _ <- notify_and_stream(activity),
389 :ok <- maybe_federate(activity) do
392 {:error, error} -> Repo.rollback(error)
396 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
406 # only accept false as false value
407 local = !(params[:local] == false)
408 forward = !(params[:forward] == false)
410 additional = params[:additional] || %{}
414 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
416 Map.merge(additional, %{"to" => [], "cc" => []})
419 with flag_data <- make_flag_data(params, additional),
420 {:ok, activity} <- insert(flag_data, local),
421 {:ok, stripped_activity} <- strip_report_status_data(activity),
422 _ <- notify_and_stream(activity),
423 :ok <- maybe_federate(stripped_activity) do
424 User.all_superusers()
425 |> Enum.filter(fn user -> not is_nil(user.email) end)
426 |> Enum.each(fn superuser ->
428 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
429 |> Pleroma.Emails.Mailer.deliver_async()
436 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
437 def move(%User{} = origin, %User{} = target, local \\ true) do
440 "actor" => origin.ap_id,
441 "object" => origin.ap_id,
442 "target" => target.ap_id
445 with true <- origin.ap_id in target.also_known_as,
446 {:ok, activity} <- insert(params, local),
447 _ <- notify_and_stream(activity) do
448 maybe_federate(activity)
450 BackgroundWorker.enqueue("move_following", %{
451 "origin_id" => origin.id,
452 "target_id" => target.id
457 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
462 def fetch_activities_for_context_query(context, opts) do
463 public = [Constants.as_public()]
467 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
470 from(activity in Activity)
471 |> maybe_preload_objects(opts)
472 |> maybe_preload_bookmarks(opts)
473 |> maybe_set_thread_muted_field(opts)
474 |> restrict_blocked(opts)
475 |> restrict_recipients(recipients, opts[:user])
479 "?->>'type' = ? and ?->>'context' = ?",
486 |> exclude_poll_votes(opts)
488 |> order_by([activity], desc: activity.id)
491 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
492 def fetch_activities_for_context(context, opts \\ %{}) do
494 |> fetch_activities_for_context_query(opts)
498 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
499 FlakeId.Ecto.CompatType.t() | nil
500 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
502 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
503 |> restrict_visibility(%{visibility: "direct"})
509 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
510 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
511 opts = Map.delete(opts, :user)
513 [Constants.as_public()]
514 |> fetch_activities_query(opts)
515 |> restrict_unlisted(opts)
516 |> Pagination.fetch_paginated(opts, pagination)
519 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
520 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
522 |> Map.put(:restrict_unlisted, true)
523 |> fetch_public_or_unlisted_activities(pagination)
526 @valid_visibilities ~w[direct unlisted public private]
528 defp restrict_visibility(query, %{visibility: visibility})
529 when is_list(visibility) do
530 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
535 "activity_visibility(?, ?, ?) = ANY (?)",
543 Logger.error("Could not restrict visibility to #{visibility}")
547 defp restrict_visibility(query, %{visibility: visibility})
548 when visibility in @valid_visibilities do
552 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
556 defp restrict_visibility(_query, %{visibility: visibility})
557 when visibility not in @valid_visibilities do
558 Logger.error("Could not restrict visibility to #{visibility}")
561 defp restrict_visibility(query, _visibility), do: query
563 defp exclude_visibility(query, %{exclude_visibilities: visibility})
564 when is_list(visibility) do
565 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
570 "activity_visibility(?, ?, ?) = ANY (?)",
578 Logger.error("Could not exclude visibility to #{visibility}")
583 defp exclude_visibility(query, %{exclude_visibilities: visibility})
584 when visibility in @valid_visibilities do
589 "activity_visibility(?, ?, ?) = ?",
598 defp exclude_visibility(query, %{exclude_visibilities: visibility})
599 when visibility not in [nil | @valid_visibilities] do
600 Logger.error("Could not exclude visibility to #{visibility}")
604 defp exclude_visibility(query, _visibility), do: query
606 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
609 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
612 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
615 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
619 defp restrict_thread_visibility(query, _, _), do: query
621 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
624 |> Map.put(:user, reading_user)
625 |> Map.put(:actor_id, user.ap_id)
628 godmode: params[:godmode],
629 reading_user: reading_user
631 |> user_activities_recipients()
632 |> fetch_activities(params)
636 def fetch_user_activities(user, reading_user, params \\ %{}) do
639 |> Map.put(:type, ["Create", "Announce"])
640 |> Map.put(:user, reading_user)
641 |> Map.put(:actor_id, user.ap_id)
642 |> Map.put(:pinned_activity_ids, user.pinned_activities)
645 if User.blocks?(reading_user, user) do
649 |> Map.put(:blocking_user, reading_user)
650 |> Map.put(:muting_user, reading_user)
654 godmode: params[:godmode],
655 reading_user: reading_user
657 |> user_activities_recipients()
658 |> fetch_activities(params)
662 def fetch_statuses(reading_user, params) do
663 params = Map.put(params, :type, ["Create", "Announce"])
666 godmode: params[:godmode],
667 reading_user: reading_user
669 |> user_activities_recipients()
670 |> fetch_activities(params, :offset)
674 defp user_activities_recipients(%{godmode: true}), do: []
676 defp user_activities_recipients(%{reading_user: reading_user}) do
678 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
680 [Constants.as_public()]
684 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
685 raise "Can't use the child object without preloading!"
688 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
690 [activity, object] in query,
693 "?->>'type' != ? or ?->>'actor' != ?",
702 defp restrict_announce_object_actor(query, _), do: query
704 defp restrict_since(query, %{since_id: ""}), do: query
706 defp restrict_since(query, %{since_id: since_id}) do
707 from(activity in query, where: activity.id > ^since_id)
710 defp restrict_since(query, _), do: query
712 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
713 raise "Can't use the child object without preloading!"
716 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
718 [_activity, object] in query,
719 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
723 defp restrict_tag_reject(query, _), do: query
725 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
726 raise "Can't use the child object without preloading!"
729 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
731 [_activity, object] in query,
732 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
736 defp restrict_tag_all(query, _), do: query
738 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
739 raise "Can't use the child object without preloading!"
742 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
744 [_activity, object] in query,
745 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
749 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
751 [_activity, object] in query,
752 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
756 defp restrict_tag(query, _), do: query
758 defp restrict_recipients(query, [], _user), do: query
760 defp restrict_recipients(query, recipients, nil) do
761 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
764 defp restrict_recipients(query, recipients, user) do
767 where: fragment("? && ?", ^recipients, activity.recipients),
768 or_where: activity.actor == ^user.ap_id
772 defp restrict_local(query, %{local_only: true}) do
773 from(activity in query, where: activity.local == true)
776 defp restrict_local(query, _), do: query
778 defp restrict_actor(query, %{actor_id: actor_id}) do
779 from(activity in query, where: activity.actor == ^actor_id)
782 defp restrict_actor(query, _), do: query
784 defp restrict_type(query, %{type: type}) when is_binary(type) do
785 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
788 defp restrict_type(query, %{type: type}) do
789 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
792 defp restrict_type(query, _), do: query
794 defp restrict_state(query, %{state: state}) do
795 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
798 defp restrict_state(query, _), do: query
800 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
802 [_activity, object] in query,
803 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
807 defp restrict_favorited_by(query, _), do: query
809 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
810 raise "Can't use the child object without preloading!"
813 defp restrict_media(query, %{only_media: true}) do
815 [_activity, object] in query,
816 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
820 defp restrict_media(query, _), do: query
822 defp restrict_replies(query, %{exclude_replies: true}) do
824 [_activity, object] in query,
825 where: fragment("?->>'inReplyTo' is null", object.data)
829 defp restrict_replies(query, %{
830 reply_filtering_user: user,
831 reply_visibility: "self"
834 [activity, object] in query,
837 "?->>'inReplyTo' is null OR ? = ANY(?)",
845 defp restrict_replies(query, %{
846 reply_filtering_user: user,
847 reply_visibility: "following"
850 [activity, object] in query,
853 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
855 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
864 defp restrict_replies(query, _), do: query
866 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
867 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
870 defp restrict_reblogs(query, _), do: query
872 defp restrict_muted(query, %{with_muted: true}), do: query
874 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
875 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
878 from([activity] in query,
879 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
880 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
883 unless opts[:skip_preload] do
884 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
890 defp restrict_muted(query, _), do: query
892 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
893 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
894 domain_blocks = user.domain_blocks || []
896 following_ap_ids = User.get_friends_ap_ids(user)
899 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
902 [activity, object: o] in query,
903 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
904 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
907 "recipients_contain_blocked_domains(?, ?) = false",
913 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
920 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
928 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
937 defp restrict_blocked(query, _), do: query
939 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
944 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
946 ^[Constants.as_public()]
951 defp restrict_unlisted(query, _), do: query
953 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
954 from(activity in query, where: activity.id in ^ids)
957 defp restrict_pinned(query, _), do: query
959 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
960 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
966 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
974 defp restrict_muted_reblogs(query, _), do: query
976 defp restrict_instance(query, %{instance: instance}) do
981 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
985 from(activity in query, where: activity.actor in ^users)
988 defp restrict_instance(query, _), do: query
990 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
992 defp exclude_poll_votes(query, _) do
993 if has_named_binding?(query, :object) do
994 from([activity, object: o] in query,
995 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1002 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1004 defp exclude_chat_messages(query, _) do
1005 if has_named_binding?(query, :object) do
1006 from([activity, object: o] in query,
1007 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1014 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1016 defp exclude_invisible_actors(query, _opts) do
1018 User.Query.build(%{invisible: true, select: [:ap_id]})
1020 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1022 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1025 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1026 from(activity in query, where: activity.id != ^id)
1029 defp exclude_id(query, _), do: query
1031 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1033 defp maybe_preload_objects(query, _) do
1035 |> Activity.with_preloaded_object()
1038 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1040 defp maybe_preload_bookmarks(query, opts) do
1042 |> Activity.with_preloaded_bookmark(opts[:user])
1045 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1047 |> Activity.with_preloaded_report_notes()
1050 defp maybe_preload_report_notes(query, _), do: query
1052 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1054 defp maybe_set_thread_muted_field(query, opts) do
1056 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1059 defp maybe_order(query, %{order: :desc}) do
1061 |> order_by(desc: :id)
1064 defp maybe_order(query, %{order: :asc}) do
1066 |> order_by(asc: :id)
1069 defp maybe_order(query, _), do: query
1071 defp fetch_activities_query_ap_ids_ops(opts) do
1072 source_user = opts[:muting_user]
1073 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1075 ap_id_relationships =
1076 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1077 [:block | ap_id_relationships]
1082 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1084 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1085 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1087 restrict_muted_reblogs_opts =
1088 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1090 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1093 def fetch_activities_query(recipients, opts \\ %{}) do
1094 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1095 fetch_activities_query_ap_ids_ops(opts)
1098 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1102 |> maybe_preload_objects(opts)
1103 |> maybe_preload_bookmarks(opts)
1104 |> maybe_preload_report_notes(opts)
1105 |> maybe_set_thread_muted_field(opts)
1106 |> maybe_order(opts)
1107 |> restrict_recipients(recipients, opts[:user])
1108 |> restrict_replies(opts)
1109 |> restrict_tag(opts)
1110 |> restrict_tag_reject(opts)
1111 |> restrict_tag_all(opts)
1112 |> restrict_since(opts)
1113 |> restrict_local(opts)
1114 |> restrict_actor(opts)
1115 |> restrict_type(opts)
1116 |> restrict_state(opts)
1117 |> restrict_favorited_by(opts)
1118 |> restrict_blocked(restrict_blocked_opts)
1119 |> restrict_muted(restrict_muted_opts)
1120 |> restrict_media(opts)
1121 |> restrict_visibility(opts)
1122 |> restrict_thread_visibility(opts, config)
1123 |> restrict_reblogs(opts)
1124 |> restrict_pinned(opts)
1125 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1126 |> restrict_instance(opts)
1127 |> restrict_announce_object_actor(opts)
1128 |> Activity.restrict_deactivated_users()
1129 |> exclude_poll_votes(opts)
1130 |> exclude_chat_messages(opts)
1131 |> exclude_invisible_actors(opts)
1132 |> exclude_visibility(opts)
1135 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1136 list_memberships = Pleroma.List.memberships(opts[:user])
1138 fetch_activities_query(recipients ++ list_memberships, opts)
1139 |> Pagination.fetch_paginated(opts, pagination)
1141 |> maybe_update_cc(list_memberships, opts[:user])
1145 Fetch favorites activities of user with order by sort adds to favorites
1147 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1148 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1150 |> Activity.Queries.by_actor()
1151 |> Activity.Queries.by_type("Like")
1152 |> Activity.with_joined_object()
1153 |> Object.with_joined_activity()
1154 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1155 |> order_by([like, _, _], desc_nulls_last: like.id)
1156 |> Pagination.fetch_paginated(
1157 Map.merge(params, %{skip_order: true}),
1162 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1163 Enum.map(activities, fn
1164 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1165 if Enum.any?(bcc, &(&1 in list_memberships)) do
1166 update_in(activity.data["cc"], &[user_ap_id | &1])
1176 defp maybe_update_cc(activities, _, _), do: activities
1178 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1179 from(activity in query,
1181 fragment("? && ?", activity.recipients, ^recipients) or
1182 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1183 ^Constants.as_public() in activity.recipients)
1187 def fetch_activities_bounded(
1189 recipients_with_public,
1191 pagination \\ :keyset
1193 fetch_activities_query([], opts)
1194 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1195 |> Pagination.fetch_paginated(opts, pagination)
1199 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1200 def upload(file, opts \\ []) do
1201 with {:ok, data} <- Upload.store(file, opts) do
1202 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1204 Repo.insert(%Object{data: obj_data})
1208 @spec get_actor_url(any()) :: binary() | nil
1209 defp get_actor_url(url) when is_binary(url), do: url
1210 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1212 defp get_actor_url(url) when is_list(url) do
1218 defp get_actor_url(_url), do: nil
1220 defp object_to_user_data(data) do
1222 data["icon"]["url"] &&
1225 "url" => [%{"href" => data["icon"]["url"]}]
1229 data["image"]["url"] &&
1232 "url" => [%{"href" => data["image"]["url"]}]
1237 |> Map.get("attachment", [])
1238 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1239 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1243 |> Map.get("tag", [])
1245 %{"type" => "Emoji"} -> true
1248 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1249 {String.trim(name, ":"), url}
1252 locked = data["manuallyApprovesFollowers"] || false
1253 data = Transmogrifier.maybe_fix_user_object(data)
1254 discoverable = data["discoverable"] || false
1255 invisible = data["invisible"] || false
1256 actor_type = data["type"] || "Person"
1259 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1260 data["publicKey"]["publicKeyPem"]
1266 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1267 data["endpoints"]["sharedInbox"]
1274 uri: get_actor_url(data["url"]),
1280 discoverable: discoverable,
1281 invisible: invisible,
1284 follower_address: data["followers"],
1285 following_address: data["following"],
1286 bio: data["summary"],
1287 actor_type: actor_type,
1288 also_known_as: Map.get(data, "alsoKnownAs", []),
1289 public_key: public_key,
1290 inbox: data["inbox"],
1291 shared_inbox: shared_inbox
1294 # nickname can be nil because of virtual actors
1295 if data["preferredUsername"] do
1299 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1302 Map.put(user_data, :nickname, nil)
1306 def fetch_follow_information_for_user(user) do
1307 with {:ok, following_data} <-
1308 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1309 {:ok, hide_follows} <- collection_private(following_data),
1310 {:ok, followers_data} <-
1311 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1312 {:ok, hide_followers} <- collection_private(followers_data) do
1315 hide_follows: hide_follows,
1316 follower_count: normalize_counter(followers_data["totalItems"]),
1317 following_count: normalize_counter(following_data["totalItems"]),
1318 hide_followers: hide_followers
1321 {:error, _} = e -> e
1326 defp normalize_counter(counter) when is_integer(counter), do: counter
1327 defp normalize_counter(_), do: 0
1329 def maybe_update_follow_information(user_data) do
1330 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1331 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1333 {:collections_available,
1334 !!(user_data[:following_address] && user_data[:follower_address])},
1336 fetch_follow_information_for_user(user_data) do
1337 info = Map.merge(user_data[:info] || %{}, info)
1340 |> Map.put(:info, info)
1342 {:user_type_check, false} ->
1345 {:collections_available, false} ->
1348 {:enabled, false} ->
1353 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1360 defp collection_private(%{"first" => %{"type" => type}})
1361 when type in ["CollectionPage", "OrderedCollectionPage"],
1364 defp collection_private(%{"first" => first}) do
1365 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1366 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1369 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1370 {:error, _} = e -> e
1375 defp collection_private(_data), do: {:ok, true}
1377 def user_data_from_user_object(data) do
1378 with {:ok, data} <- MRF.filter(data) do
1379 {:ok, object_to_user_data(data)}
1385 def fetch_and_prepare_user_from_ap_id(ap_id) do
1386 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1387 {:ok, data} <- user_data_from_user_object(data) do
1388 {:ok, maybe_update_follow_information(data)}
1390 {:error, "Object has been deleted" = e} ->
1391 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1395 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1400 def make_user_from_ap_id(ap_id) do
1401 user = User.get_cached_by_ap_id(ap_id)
1403 if user && !User.ap_enabled?(user) do
1404 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1406 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1409 |> User.remote_user_changeset(data)
1410 |> User.update_and_set_cache()
1413 |> User.remote_user_changeset()
1421 def make_user_from_nickname(nickname) do
1422 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1423 make_user_from_ap_id(ap_id)
1425 _e -> {:error, "No AP id in WebFinger"}
1429 # filter out broken threads
1430 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1431 entire_thread_visible_for_user?(activity, user)
1434 # do post-processing on a specific activity
1435 def contain_activity(%Activity{} = activity, %User{} = user) do
1436 contain_broken_threads(activity, user)
1439 def fetch_direct_messages_query do
1441 |> restrict_type(%{type: "Create"})
1442 |> restrict_visibility(%{visibility: "direct"})
1443 |> order_by([activity], asc: activity.id)