1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
390 {:error, error} -> Repo.rollback(error)
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id
403 with true <- origin.ap_id in target.also_known_as,
404 {:ok, activity} <- insert(params, local),
405 _ <- notify_and_stream(activity) do
406 maybe_federate(activity)
408 BackgroundWorker.enqueue("move_following", %{
409 "origin_id" => origin.id,
410 "target_id" => target.id
415 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
420 def fetch_activities_for_context_query(context, opts) do
421 public = [Constants.as_public()]
425 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
428 from(activity in Activity)
429 |> maybe_preload_objects(opts)
430 |> maybe_preload_bookmarks(opts)
431 |> maybe_set_thread_muted_field(opts)
432 |> restrict_blocked(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
438 "?->>'type' = ? and ?->>'context' = ?",
445 |> exclude_poll_votes(opts)
447 |> order_by([activity], desc: activity.id)
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
453 |> fetch_activities_for_context_query(opts)
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
485 @valid_visibilities ~w[direct unlisted public private]
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
494 "activity_visibility(?, ?, ?) = ANY (?)",
502 Logger.error("Could not restrict visibility to #{visibility}")
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
520 defp restrict_visibility(query, _visibility), do: query
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
529 "activity_visibility(?, ?, ?) = ANY (?)",
537 Logger.error("Could not exclude visibility to #{visibility}")
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
548 "activity_visibility(?, ?, ?) = ?",
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
563 defp exclude_visibility(query, _visibility), do: query
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
578 defp restrict_thread_visibility(query, _, _), do: query
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
587 godmode: params[:godmode],
588 reading_user: reading_user
590 |> user_activities_recipients()
591 |> fetch_activities(params)
595 def fetch_user_activities(user, reading_user, params \\ %{})
597 def fetch_user_activities(user, reading_user, %{total: true} = params) do
598 result = fetch_activities_for_user(user, reading_user, params)
600 Keyword.put(result, :items, Enum.reverse(result[:items]))
603 def fetch_user_activities(user, reading_user, params) do
605 |> fetch_activities_for_user(reading_user, params)
609 defp fetch_activities_for_user(user, reading_user, params) do
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
618 if User.blocks?(reading_user, user) do
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
626 pagination_type = Map.get(params, :pagination_type) || :keyset
629 godmode: params[:godmode],
630 reading_user: reading_user
632 |> user_activities_recipients()
633 |> fetch_activities(params, pagination_type)
636 def fetch_statuses(reading_user, %{total: true} = params) do
637 result = fetch_activities_for_reading_user(reading_user, params)
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
641 def fetch_statuses(reading_user, params) do
643 |> fetch_activities_for_reading_user(params)
647 defp fetch_activities_for_reading_user(reading_user, params) do
648 params = Map.put(params, :type, ["Create", "Announce"])
651 godmode: params[:godmode],
652 reading_user: reading_user
654 |> user_activities_recipients()
655 |> fetch_activities(params, :offset)
658 defp user_activities_recipients(%{godmode: true}), do: []
660 defp user_activities_recipients(%{reading_user: reading_user}) do
662 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
664 [Constants.as_public()]
668 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
669 raise "Can't use the child object without preloading!"
672 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
674 [activity, object] in query,
677 "?->>'type' != ? or ?->>'actor' != ?",
686 defp restrict_announce_object_actor(query, _), do: query
688 defp restrict_since(query, %{since_id: ""}), do: query
690 defp restrict_since(query, %{since_id: since_id}) do
691 from(activity in query, where: activity.id > ^since_id)
694 defp restrict_since(query, _), do: query
696 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
697 raise_on_missing_preload()
700 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
702 [_activity, object] in query,
703 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
707 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
708 restrict_embedded_tag_any(query, %{tag: tag})
711 defp restrict_embedded_tag_all(query, _), do: query
713 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
714 raise_on_missing_preload()
717 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag}) do
719 [_activity, object] in query,
720 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
724 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
725 restrict_embedded_tag_any(query, %{tag: [tag]})
728 defp restrict_embedded_tag_any(query, _), do: query
730 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
731 raise_on_missing_preload()
734 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
736 [_activity, object] in query,
737 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
741 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
742 when is_binary(tag_reject) do
743 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
746 defp restrict_embedded_tag_reject_any(query, _), do: query
748 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
749 raise_on_missing_preload()
752 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
754 [_activity, object] in query,
758 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
759 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
760 AND hashtags_objects.object_id = ?) @> ?
769 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
770 restrict_hashtag_any(query, %{tag: tag})
773 defp restrict_hashtag_all(query, _), do: query
775 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
779 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
781 [_activity, object] in query,
785 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
786 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
787 AND hashtags_objects.object_id = ? LIMIT 1)
795 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
796 restrict_hashtag_any(query, %{tag: [tag]})
799 defp restrict_hashtag_any(query, _), do: query
801 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
802 raise_on_missing_preload()
805 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
807 [_activity, object] in query,
811 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
812 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
813 AND hashtags_objects.object_id = ? LIMIT 1)
821 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
822 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
825 defp restrict_hashtag_reject_any(query, _), do: query
827 defp raise_on_missing_preload do
828 raise "Can't use the child object without preloading!"
831 defp restrict_recipients(query, [], _user), do: query
833 defp restrict_recipients(query, recipients, nil) do
834 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
837 defp restrict_recipients(query, recipients, user) do
840 where: fragment("? && ?", ^recipients, activity.recipients),
841 or_where: activity.actor == ^user.ap_id
845 defp restrict_local(query, %{local_only: true}) do
846 from(activity in query, where: activity.local == true)
849 defp restrict_local(query, _), do: query
851 defp restrict_remote(query, %{remote: true}) do
852 from(activity in query, where: activity.local == false)
855 defp restrict_remote(query, _), do: query
857 defp restrict_actor(query, %{actor_id: actor_id}) do
858 from(activity in query, where: activity.actor == ^actor_id)
861 defp restrict_actor(query, _), do: query
863 defp restrict_type(query, %{type: type}) when is_binary(type) do
864 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
867 defp restrict_type(query, %{type: type}) do
868 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
871 defp restrict_type(query, _), do: query
873 defp restrict_state(query, %{state: state}) do
874 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
877 defp restrict_state(query, _), do: query
879 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
881 [_activity, object] in query,
882 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
886 defp restrict_favorited_by(query, _), do: query
888 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
889 raise "Can't use the child object without preloading!"
892 defp restrict_media(query, %{only_media: true}) do
894 [activity, object] in query,
895 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
896 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
900 defp restrict_media(query, _), do: query
902 defp restrict_replies(query, %{exclude_replies: true}) do
904 [_activity, object] in query,
905 where: fragment("?->>'inReplyTo' is null", object.data)
909 defp restrict_replies(query, %{
910 reply_filtering_user: %User{} = user,
911 reply_visibility: "self"
914 [activity, object] in query,
917 "?->>'inReplyTo' is null OR ? = ANY(?)",
925 defp restrict_replies(query, %{
926 reply_filtering_user: %User{} = user,
927 reply_visibility: "following"
930 [activity, object] in query,
934 ?->>'type' != 'Create' -- This isn't a Create
935 OR ?->>'inReplyTo' is null -- this isn't a reply
936 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
937 -- unless they are the author (because authors
938 -- are also part of the recipients). This leads
939 -- to a bug that self-replies by friends won't
941 OR ? = ? -- The actor is us
945 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
954 defp restrict_replies(query, _), do: query
956 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
957 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
960 defp restrict_reblogs(query, _), do: query
962 defp restrict_muted(query, %{with_muted: true}), do: query
964 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
965 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
968 from([activity] in query,
969 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
972 "not (?->'to' \\?| ?) or ? = ?",
980 unless opts[:skip_preload] do
981 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
987 defp restrict_muted(query, _), do: query
989 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
990 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
991 domain_blocks = user.domain_blocks || []
993 following_ap_ids = User.get_friends_ap_ids(user)
996 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
999 [activity, object: o] in query,
1000 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1003 "((not (? && ?)) or ? = ?)",
1004 activity.recipients,
1011 "recipients_contain_blocked_domains(?, ?) = false",
1012 activity.recipients,
1017 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1024 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1032 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1041 defp restrict_blocked(query, _), do: query
1043 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1048 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1050 ^[Constants.as_public()]
1055 defp restrict_unlisted(query, _), do: query
1057 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1058 from(activity in query, where: activity.id in ^ids)
1061 defp restrict_pinned(query, _), do: query
1063 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1064 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1070 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1078 defp restrict_muted_reblogs(query, _), do: query
1080 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1083 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1087 defp restrict_instance(query, _), do: query
1089 defp restrict_filtered(query, %{user: %User{} = user}) do
1090 case Filter.compose_regex(user) do
1095 from([activity, object] in query,
1097 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1098 activity.actor == ^user.ap_id
1103 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1104 restrict_filtered(query, %{user: user})
1107 defp restrict_filtered(query, _), do: query
1109 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1111 defp exclude_poll_votes(query, _) do
1112 if has_named_binding?(query, :object) do
1113 from([activity, object: o] in query,
1114 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1121 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1123 defp exclude_chat_messages(query, _) do
1124 if has_named_binding?(query, :object) do
1125 from([activity, object: o] in query,
1126 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1133 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1135 defp exclude_invisible_actors(query, _opts) do
1137 User.Query.build(%{invisible: true, select: [:ap_id]})
1139 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1141 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1144 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1145 from(activity in query, where: activity.id != ^id)
1148 defp exclude_id(query, _), do: query
1150 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1152 defp maybe_preload_objects(query, _) do
1154 |> Activity.with_preloaded_object()
1157 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1159 defp maybe_preload_bookmarks(query, opts) do
1161 |> Activity.with_preloaded_bookmark(opts[:user])
1164 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1166 |> Activity.with_preloaded_report_notes()
1169 defp maybe_preload_report_notes(query, _), do: query
1171 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1173 defp maybe_set_thread_muted_field(query, opts) do
1175 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1178 defp maybe_order(query, %{order: :desc}) do
1180 |> order_by(desc: :id)
1183 defp maybe_order(query, %{order: :asc}) do
1185 |> order_by(asc: :id)
1188 defp maybe_order(query, _), do: query
1190 defp fetch_activities_query_ap_ids_ops(opts) do
1191 source_user = opts[:muting_user]
1192 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1194 ap_id_relationships =
1195 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1196 [:block | ap_id_relationships]
1201 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1203 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1204 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1206 restrict_muted_reblogs_opts =
1207 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1209 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1212 def fetch_activities_query(recipients, opts \\ %{}) do
1213 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1214 fetch_activities_query_ap_ids_ops(opts)
1217 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1222 |> maybe_preload_objects(opts)
1223 |> maybe_preload_bookmarks(opts)
1224 |> maybe_preload_report_notes(opts)
1225 |> maybe_set_thread_muted_field(opts)
1226 |> maybe_order(opts)
1227 |> restrict_recipients(recipients, opts[:user])
1228 |> restrict_replies(opts)
1229 |> restrict_since(opts)
1230 |> restrict_local(opts)
1231 |> restrict_remote(opts)
1232 |> restrict_actor(opts)
1233 |> restrict_type(opts)
1234 |> restrict_state(opts)
1235 |> restrict_favorited_by(opts)
1236 |> restrict_blocked(restrict_blocked_opts)
1237 |> restrict_muted(restrict_muted_opts)
1238 |> restrict_filtered(opts)
1239 |> restrict_media(opts)
1240 |> restrict_visibility(opts)
1241 |> restrict_thread_visibility(opts, config)
1242 |> restrict_reblogs(opts)
1243 |> restrict_pinned(opts)
1244 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1245 |> restrict_instance(opts)
1246 |> restrict_announce_object_actor(opts)
1247 |> restrict_filtered(opts)
1248 |> Activity.restrict_deactivated_users()
1249 |> exclude_poll_votes(opts)
1250 |> exclude_chat_messages(opts)
1251 |> exclude_invisible_actors(opts)
1252 |> exclude_visibility(opts)
1254 if Config.get([:database, :improved_hashtag_timeline]) do
1256 |> restrict_hashtag_any(opts)
1257 |> restrict_hashtag_all(opts)
1258 |> restrict_hashtag_reject_any(opts)
1261 |> restrict_embedded_tag_any(opts)
1262 |> restrict_embedded_tag_all(opts)
1263 |> restrict_embedded_tag_reject_any(opts)
1267 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1268 list_memberships = Pleroma.List.memberships(opts[:user])
1270 fetch_activities_query(recipients ++ list_memberships, opts)
1271 |> Pagination.fetch_paginated(opts, pagination)
1273 |> maybe_update_cc(list_memberships, opts[:user])
1277 Fetch favorites activities of user with order by sort adds to favorites
1279 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1280 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1282 |> Activity.Queries.by_actor()
1283 |> Activity.Queries.by_type("Like")
1284 |> Activity.with_joined_object()
1285 |> Object.with_joined_activity()
1286 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1287 |> order_by([like, _, _], desc_nulls_last: like.id)
1288 |> Pagination.fetch_paginated(
1289 Map.merge(params, %{skip_order: true}),
1294 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1295 Enum.map(activities, fn
1296 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1297 if Enum.any?(bcc, &(&1 in list_memberships)) do
1298 update_in(activity.data["cc"], &[user_ap_id | &1])
1308 defp maybe_update_cc(activities, _, _), do: activities
1310 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1311 from(activity in query,
1313 fragment("? && ?", activity.recipients, ^recipients) or
1314 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1315 ^Constants.as_public() in activity.recipients)
1319 def fetch_activities_bounded(
1321 recipients_with_public,
1323 pagination \\ :keyset
1325 fetch_activities_query([], opts)
1326 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1327 |> Pagination.fetch_paginated(opts, pagination)
1331 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1332 def upload(file, opts \\ []) do
1333 with {:ok, data} <- Upload.store(file, opts) do
1334 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1336 Repo.insert(%Object{data: obj_data})
1340 @spec get_actor_url(any()) :: binary() | nil
1341 defp get_actor_url(url) when is_binary(url), do: url
1342 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1344 defp get_actor_url(url) when is_list(url) do
1350 defp get_actor_url(_url), do: nil
1352 defp object_to_user_data(data) do
1354 data["icon"]["url"] &&
1357 "url" => [%{"href" => data["icon"]["url"]}]
1361 data["image"]["url"] &&
1364 "url" => [%{"href" => data["image"]["url"]}]
1369 |> Map.get("attachment", [])
1370 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1371 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1375 |> Map.get("tag", [])
1377 %{"type" => "Emoji"} -> true
1380 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1381 {String.trim(name, ":"), url}
1384 is_locked = data["manuallyApprovesFollowers"] || false
1385 capabilities = data["capabilities"] || %{}
1386 accepts_chat_messages = capabilities["acceptsChatMessages"]
1387 data = Transmogrifier.maybe_fix_user_object(data)
1388 is_discoverable = data["discoverable"] || false
1389 invisible = data["invisible"] || false
1390 actor_type = data["type"] || "Person"
1393 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1394 data["publicKey"]["publicKeyPem"]
1400 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1401 data["endpoints"]["sharedInbox"]
1408 uri: get_actor_url(data["url"]),
1413 is_locked: is_locked,
1414 is_discoverable: is_discoverable,
1415 invisible: invisible,
1418 follower_address: data["followers"],
1419 following_address: data["following"],
1420 bio: data["summary"] || "",
1421 actor_type: actor_type,
1422 also_known_as: Map.get(data, "alsoKnownAs", []),
1423 public_key: public_key,
1424 inbox: data["inbox"],
1425 shared_inbox: shared_inbox,
1426 accepts_chat_messages: accepts_chat_messages
1429 # nickname can be nil because of virtual actors
1430 if data["preferredUsername"] do
1434 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1437 Map.put(user_data, :nickname, nil)
1441 def fetch_follow_information_for_user(user) do
1442 with {:ok, following_data} <-
1443 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1444 {:ok, hide_follows} <- collection_private(following_data),
1445 {:ok, followers_data} <-
1446 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1447 {:ok, hide_followers} <- collection_private(followers_data) do
1450 hide_follows: hide_follows,
1451 follower_count: normalize_counter(followers_data["totalItems"]),
1452 following_count: normalize_counter(following_data["totalItems"]),
1453 hide_followers: hide_followers
1456 {:error, _} = e -> e
1461 defp normalize_counter(counter) when is_integer(counter), do: counter
1462 defp normalize_counter(_), do: 0
1464 def maybe_update_follow_information(user_data) do
1465 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1466 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1468 {:collections_available,
1469 !!(user_data[:following_address] && user_data[:follower_address])},
1471 fetch_follow_information_for_user(user_data) do
1472 info = Map.merge(user_data[:info] || %{}, info)
1475 |> Map.put(:info, info)
1477 {:user_type_check, false} ->
1480 {:collections_available, false} ->
1483 {:enabled, false} ->
1488 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1495 defp collection_private(%{"first" => %{"type" => type}})
1496 when type in ["CollectionPage", "OrderedCollectionPage"],
1499 defp collection_private(%{"first" => first}) do
1500 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1501 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1504 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1505 {:error, _} = e -> e
1510 defp collection_private(_data), do: {:ok, true}
1512 def user_data_from_user_object(data) do
1513 with {:ok, data} <- MRF.filter(data) do
1514 {:ok, object_to_user_data(data)}
1520 def fetch_and_prepare_user_from_ap_id(ap_id) do
1521 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1522 {:ok, data} <- user_data_from_user_object(data) do
1523 {:ok, maybe_update_follow_information(data)}
1525 # If this has been deleted, only log a debug and not an error
1526 {:error, "Object has been deleted" = e} ->
1527 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1530 {:error, {:reject, reason} = e} ->
1531 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1535 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1540 def maybe_handle_clashing_nickname(data) do
1541 with nickname when is_binary(nickname) <- data[:nickname],
1542 %User{} = old_user <- User.get_by_nickname(nickname),
1543 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1545 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1551 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1552 |> User.update_and_set_cache()
1554 {:ap_id_comparison, true} ->
1556 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1564 def make_user_from_ap_id(ap_id) do
1565 user = User.get_cached_by_ap_id(ap_id)
1567 if user && !User.ap_enabled?(user) do
1568 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1570 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1573 |> User.remote_user_changeset(data)
1574 |> User.update_and_set_cache()
1576 maybe_handle_clashing_nickname(data)
1579 |> User.remote_user_changeset()
1587 def make_user_from_nickname(nickname) do
1588 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1589 make_user_from_ap_id(ap_id)
1591 _e -> {:error, "No AP id in WebFinger"}
1595 # filter out broken threads
1596 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1597 entire_thread_visible_for_user?(activity, user)
1600 # do post-processing on a specific activity
1601 def contain_activity(%Activity{} = activity, %User{} = user) do
1602 contain_broken_threads(activity, user)
1605 def fetch_direct_messages_query do
1607 |> restrict_type(%{type: "Create"})
1608 |> restrict_visibility(%{visibility: "direct"})
1609 |> order_by([activity], asc: activity.id)