1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
389 {:error, error} -> Repo.rollback(error)
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
437 "?->>'type' = ? and ?->>'context' = ?",
444 |> exclude_poll_votes(opts)
446 |> order_by([activity], desc: activity.id)
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
452 |> fetch_activities_for_context_query(opts)
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
484 @valid_visibilities ~w[direct unlisted public private]
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
493 "activity_visibility(?, ?, ?) = ANY (?)",
501 Logger.error("Could not restrict visibility to #{visibility}")
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
519 defp restrict_visibility(query, _visibility), do: query
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
528 "activity_visibility(?, ?, ?) = ANY (?)",
536 Logger.error("Could not exclude visibility to #{visibility}")
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
547 "activity_visibility(?, ?, ?) = ?",
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
562 defp exclude_visibility(query, _visibility), do: query
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
577 defp restrict_thread_visibility(query, _, _), do: query
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
586 godmode: params[:godmode],
587 reading_user: reading_user
589 |> user_activities_recipients()
590 |> fetch_activities(params)
594 def fetch_user_activities(user, reading_user, params \\ %{})
596 def fetch_user_activities(user, reading_user, %{total: true} = params) do
597 result = fetch_activities_for_user(user, reading_user, params)
599 Keyword.put(result, :items, Enum.reverse(result[:items]))
602 def fetch_user_activities(user, reading_user, params) do
604 |> fetch_activities_for_user(reading_user, params)
608 defp fetch_activities_for_user(user, reading_user, params) do
611 |> Map.put(:type, ["Create", "Announce"])
612 |> Map.put(:user, reading_user)
613 |> Map.put(:actor_id, user.ap_id)
614 |> Map.put(:pinned_activity_ids, user.pinned_activities)
617 if User.blocks?(reading_user, user) do
621 |> Map.put(:blocking_user, reading_user)
622 |> Map.put(:muting_user, reading_user)
625 pagination_type = Map.get(params, :pagination_type) || :keyset
628 godmode: params[:godmode],
629 reading_user: reading_user
631 |> user_activities_recipients()
632 |> fetch_activities(params, pagination_type)
635 def fetch_statuses(reading_user, %{total: true} = params) do
636 result = fetch_activities_for_reading_user(reading_user, params)
637 Keyword.put(result, :items, Enum.reverse(result[:items]))
640 def fetch_statuses(reading_user, params) do
642 |> fetch_activities_for_reading_user(params)
646 defp fetch_activities_for_reading_user(reading_user, params) do
647 params = Map.put(params, :type, ["Create", "Announce"])
650 godmode: params[:godmode],
651 reading_user: reading_user
653 |> user_activities_recipients()
654 |> fetch_activities(params, :offset)
657 defp user_activities_recipients(%{godmode: true}), do: []
659 defp user_activities_recipients(%{reading_user: reading_user}) do
661 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
663 [Constants.as_public()]
667 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
668 raise "Can't use the child object without preloading!"
671 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
673 [activity, object] in query,
676 "?->>'type' != ? or ?->>'actor' != ?",
685 defp restrict_announce_object_actor(query, _), do: query
687 defp restrict_since(query, %{since_id: ""}), do: query
689 defp restrict_since(query, %{since_id: since_id}) do
690 from(activity in query, where: activity.id > ^since_id)
693 defp restrict_since(query, _), do: query
695 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
696 raise "Can't use the child object without preloading!"
699 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
701 [_activity, object] in query,
702 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
706 defp restrict_tag_reject(query, _), do: query
708 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
709 raise "Can't use the child object without preloading!"
712 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
714 [_activity, object] in query,
715 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
719 defp restrict_tag_all(query, _), do: query
721 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
722 raise "Can't use the child object without preloading!"
725 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
727 [_activity, object] in query,
728 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
732 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
734 [_activity, object] in query,
735 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
739 defp restrict_tag(query, _), do: query
741 defp restrict_recipients(query, [], _user), do: query
743 defp restrict_recipients(query, recipients, nil) do
744 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
747 defp restrict_recipients(query, recipients, user) do
750 where: fragment("? && ?", ^recipients, activity.recipients),
751 or_where: activity.actor == ^user.ap_id
755 defp restrict_local(query, %{local_only: true}) do
756 from(activity in query, where: activity.local == true)
759 defp restrict_local(query, _), do: query
761 defp restrict_remote(query, %{remote: true}) do
762 from(activity in query, where: activity.local == false)
765 defp restrict_remote(query, _), do: query
767 defp restrict_actor(query, %{actor_id: actor_id}) do
768 from(activity in query, where: activity.actor == ^actor_id)
771 defp restrict_actor(query, _), do: query
773 defp restrict_type(query, %{type: type}) when is_binary(type) do
774 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
777 defp restrict_type(query, %{type: type}) do
778 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
781 defp restrict_type(query, _), do: query
783 defp restrict_state(query, %{state: state}) do
784 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
787 defp restrict_state(query, _), do: query
789 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
791 [_activity, object] in query,
792 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
796 defp restrict_favorited_by(query, _), do: query
798 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
799 raise "Can't use the child object without preloading!"
802 defp restrict_media(query, %{only_media: true}) do
804 [activity, object] in query,
805 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
806 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
810 defp restrict_media(query, _), do: query
812 defp restrict_replies(query, %{exclude_replies: true}) do
814 [_activity, object] in query,
815 where: fragment("?->>'inReplyTo' is null", object.data)
819 defp restrict_replies(query, %{
820 reply_filtering_user: %User{} = user,
821 reply_visibility: "self"
824 [activity, object] in query,
827 "?->>'inReplyTo' is null OR ? = ANY(?)",
835 defp restrict_replies(query, %{
836 reply_filtering_user: %User{} = user,
837 reply_visibility: "following"
840 [activity, object] in query,
844 ?->>'type' != 'Create' -- This isn't a Create
845 OR ?->>'inReplyTo' is null -- this isn't a reply
846 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
847 -- unless they are the author (because authors
848 -- are also part of the recipients). This leads
849 -- to a bug that self-replies by friends won't
851 OR ? = ? -- The actor is us
855 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
864 defp restrict_replies(query, _), do: query
866 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
867 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
870 defp restrict_reblogs(query, _), do: query
872 defp restrict_muted(query, %{with_muted: true}), do: query
874 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
875 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
878 from([activity] in query,
879 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
882 "not (?->'to' \\?| ?) or ? = ?",
890 unless opts[:skip_preload] do
891 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
897 defp restrict_muted(query, _), do: query
899 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
900 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
901 domain_blocks = user.domain_blocks || []
903 following_ap_ids = User.get_friends_ap_ids(user)
906 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
909 [activity, object: o] in query,
910 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
913 "((not (? && ?)) or ? = ?)",
921 "recipients_contain_blocked_domains(?, ?) = false",
927 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
934 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
942 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
951 defp restrict_blocked(query, _), do: query
953 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
958 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
960 ^[Constants.as_public()]
965 defp restrict_unlisted(query, _), do: query
967 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
968 from(activity in query, where: activity.id in ^ids)
971 defp restrict_pinned(query, _), do: query
973 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
974 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
980 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
988 defp restrict_muted_reblogs(query, _), do: query
990 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
993 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
997 defp restrict_instance(query, _), do: query
999 defp restrict_filtered(query, %{user: %User{} = user}) do
1000 case Filter.compose_regex(user) do
1005 from([activity, object] in query,
1007 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1008 activity.actor == ^user.ap_id
1013 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1014 restrict_filtered(query, %{user: user})
1017 defp restrict_filtered(query, _), do: query
1019 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1021 defp exclude_poll_votes(query, _) do
1022 if has_named_binding?(query, :object) do
1023 from([activity, object: o] in query,
1024 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1031 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1033 defp exclude_chat_messages(query, _) do
1034 if has_named_binding?(query, :object) do
1035 from([activity, object: o] in query,
1036 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1043 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1045 defp exclude_invisible_actors(query, _opts) do
1047 User.Query.build(%{invisible: true, select: [:ap_id]})
1049 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1051 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1054 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1055 from(activity in query, where: activity.id != ^id)
1058 defp exclude_id(query, _), do: query
1060 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1062 defp maybe_preload_objects(query, _) do
1064 |> Activity.with_preloaded_object()
1067 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1069 defp maybe_preload_bookmarks(query, opts) do
1071 |> Activity.with_preloaded_bookmark(opts[:user])
1074 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1076 |> Activity.with_preloaded_report_notes()
1079 defp maybe_preload_report_notes(query, _), do: query
1081 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1083 defp maybe_set_thread_muted_field(query, opts) do
1085 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1088 defp maybe_order(query, %{order: :desc}) do
1090 |> order_by(desc: :id)
1093 defp maybe_order(query, %{order: :asc}) do
1095 |> order_by(asc: :id)
1098 defp maybe_order(query, _), do: query
1100 defp fetch_activities_query_ap_ids_ops(opts) do
1101 source_user = opts[:muting_user]
1102 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1104 ap_id_relationships =
1105 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1106 [:block | ap_id_relationships]
1111 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1113 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1114 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1116 restrict_muted_reblogs_opts =
1117 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1119 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1122 def fetch_activities_query(recipients, opts \\ %{}) do
1123 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1124 fetch_activities_query_ap_ids_ops(opts)
1127 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1131 |> maybe_preload_objects(opts)
1132 |> maybe_preload_bookmarks(opts)
1133 |> maybe_preload_report_notes(opts)
1134 |> maybe_set_thread_muted_field(opts)
1135 |> maybe_order(opts)
1136 |> restrict_recipients(recipients, opts[:user])
1137 |> restrict_replies(opts)
1138 |> restrict_tag(opts)
1139 |> restrict_tag_reject(opts)
1140 |> restrict_tag_all(opts)
1141 |> restrict_since(opts)
1142 |> restrict_local(opts)
1143 |> restrict_remote(opts)
1144 |> restrict_actor(opts)
1145 |> restrict_type(opts)
1146 |> restrict_state(opts)
1147 |> restrict_favorited_by(opts)
1148 |> restrict_blocked(restrict_blocked_opts)
1149 |> restrict_muted(restrict_muted_opts)
1150 |> restrict_filtered(opts)
1151 |> restrict_media(opts)
1152 |> restrict_visibility(opts)
1153 |> restrict_thread_visibility(opts, config)
1154 |> restrict_reblogs(opts)
1155 |> restrict_pinned(opts)
1156 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1157 |> restrict_instance(opts)
1158 |> restrict_announce_object_actor(opts)
1159 |> restrict_filtered(opts)
1160 |> Activity.restrict_deactivated_users()
1161 |> exclude_poll_votes(opts)
1162 |> exclude_chat_messages(opts)
1163 |> exclude_invisible_actors(opts)
1164 |> exclude_visibility(opts)
1167 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1168 list_memberships = Pleroma.List.memberships(opts[:user])
1170 fetch_activities_query(recipients ++ list_memberships, opts)
1171 |> Pagination.fetch_paginated(opts, pagination)
1173 |> maybe_update_cc(list_memberships, opts[:user])
1177 Fetch favorites activities of user with order by sort adds to favorites
1179 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1180 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1182 |> Activity.Queries.by_actor()
1183 |> Activity.Queries.by_type("Like")
1184 |> Activity.with_joined_object()
1185 |> Object.with_joined_activity()
1186 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1187 |> order_by([like, _, _], desc_nulls_last: like.id)
1188 |> Pagination.fetch_paginated(
1189 Map.merge(params, %{skip_order: true}),
1194 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1195 Enum.map(activities, fn
1196 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1197 if Enum.any?(bcc, &(&1 in list_memberships)) do
1198 update_in(activity.data["cc"], &[user_ap_id | &1])
1208 defp maybe_update_cc(activities, _, _), do: activities
1210 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1211 from(activity in query,
1213 fragment("? && ?", activity.recipients, ^recipients) or
1214 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1215 ^Constants.as_public() in activity.recipients)
1219 def fetch_activities_bounded(
1221 recipients_with_public,
1223 pagination \\ :keyset
1225 fetch_activities_query([], opts)
1226 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1227 |> Pagination.fetch_paginated(opts, pagination)
1231 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1232 def upload(file, opts \\ []) do
1233 with {:ok, data} <- Upload.store(file, opts) do
1234 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1236 Repo.insert(%Object{data: obj_data})
1240 @spec get_actor_url(any()) :: binary() | nil
1241 defp get_actor_url(url) when is_binary(url), do: url
1242 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1244 defp get_actor_url(url) when is_list(url) do
1250 defp get_actor_url(_url), do: nil
1252 defp object_to_user_data(data) do
1254 data["icon"]["url"] &&
1257 "url" => [%{"href" => data["icon"]["url"]}]
1261 data["image"]["url"] &&
1264 "url" => [%{"href" => data["image"]["url"]}]
1269 |> Map.get("attachment", [])
1270 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1271 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1275 |> Map.get("tag", [])
1277 %{"type" => "Emoji"} -> true
1280 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1281 {String.trim(name, ":"), url}
1284 is_locked = data["manuallyApprovesFollowers"] || false
1285 capabilities = data["capabilities"] || %{}
1286 accepts_chat_messages = capabilities["acceptsChatMessages"]
1287 data = Transmogrifier.maybe_fix_user_object(data)
1288 is_discoverable = data["discoverable"] || false
1289 invisible = data["invisible"] || false
1290 actor_type = data["type"] || "Person"
1293 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1294 data["publicKey"]["publicKeyPem"]
1300 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1301 data["endpoints"]["sharedInbox"]
1308 uri: get_actor_url(data["url"]),
1313 is_locked: is_locked,
1314 is_discoverable: is_discoverable,
1315 invisible: invisible,
1318 follower_address: data["followers"],
1319 following_address: data["following"],
1320 bio: data["summary"] || "",
1321 actor_type: actor_type,
1322 also_known_as: Map.get(data, "alsoKnownAs", []),
1323 public_key: public_key,
1324 inbox: data["inbox"],
1325 shared_inbox: shared_inbox,
1326 accepts_chat_messages: accepts_chat_messages
1329 # nickname can be nil because of virtual actors
1330 if data["preferredUsername"] do
1334 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1337 Map.put(user_data, :nickname, nil)
1341 def fetch_follow_information_for_user(user) do
1342 with {:ok, following_data} <-
1343 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1344 {:ok, hide_follows} <- collection_private(following_data),
1345 {:ok, followers_data} <-
1346 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1347 {:ok, hide_followers} <- collection_private(followers_data) do
1350 hide_follows: hide_follows,
1351 follower_count: normalize_counter(followers_data["totalItems"]),
1352 following_count: normalize_counter(following_data["totalItems"]),
1353 hide_followers: hide_followers
1356 {:error, _} = e -> e
1361 defp normalize_counter(counter) when is_integer(counter), do: counter
1362 defp normalize_counter(_), do: 0
1364 def maybe_update_follow_information(user_data) do
1365 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1366 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1368 {:collections_available,
1369 !!(user_data[:following_address] && user_data[:follower_address])},
1371 fetch_follow_information_for_user(user_data) do
1372 info = Map.merge(user_data[:info] || %{}, info)
1375 |> Map.put(:info, info)
1377 {:user_type_check, false} ->
1380 {:collections_available, false} ->
1383 {:enabled, false} ->
1388 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1395 defp collection_private(%{"first" => %{"type" => type}})
1396 when type in ["CollectionPage", "OrderedCollectionPage"],
1399 defp collection_private(%{"first" => first}) do
1400 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1401 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1404 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1405 {:error, _} = e -> e
1410 defp collection_private(_data), do: {:ok, true}
1412 def user_data_from_user_object(data) do
1413 with {:ok, data} <- MRF.filter(data) do
1414 {:ok, object_to_user_data(data)}
1420 def fetch_and_prepare_user_from_ap_id(ap_id) do
1421 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1422 {:ok, data} <- user_data_from_user_object(data) do
1423 {:ok, maybe_update_follow_information(data)}
1425 # If this has been deleted, only log a debug and not an error
1426 {:error, "Object has been deleted" = e} ->
1427 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1430 {:error, {:reject, reason} = e} ->
1431 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1435 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1440 def maybe_handle_clashing_nickname(data) do
1441 with nickname when is_binary(nickname) <- data[:nickname],
1442 %User{} = old_user <- User.get_by_nickname(nickname),
1443 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1445 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1451 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1452 |> User.update_and_set_cache()
1454 {:ap_id_comparison, true} ->
1456 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1464 def make_user_from_ap_id(ap_id) do
1465 user = User.get_cached_by_ap_id(ap_id)
1467 if user && !User.ap_enabled?(user) do
1468 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1470 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1473 |> User.remote_user_changeset(data)
1474 |> User.update_and_set_cache()
1476 maybe_handle_clashing_nickname(data)
1479 |> User.remote_user_changeset()
1487 def make_user_from_nickname(nickname) do
1488 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1489 make_user_from_ap_id(ap_id)
1491 _e -> {:error, "No AP id in WebFinger"}
1495 # filter out broken threads
1496 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1497 entire_thread_visible_for_user?(activity, user)
1500 # do post-processing on a specific activity
1501 def contain_activity(%Activity{} = activity, %User{} = user) do
1502 contain_broken_threads(activity, user)
1505 def fetch_direct_messages_query do
1507 |> restrict_type(%{type: "Create"})
1508 |> restrict_visibility(%{visibility: "direct"})
1509 |> order_by([activity], asc: activity.id)