1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
35 require Pleroma.Constants
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
67 defp check_actor_can_insert(_), do: true
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
74 defp check_remote_limit(_), do: true
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
84 defp increase_replies_count_if_reply(%{
85 "object" => %{"inReplyTo" => reply_ap_id} = object,
88 if is_public?(object) do
89 Object.increase_replies_count(reply_ap_id)
93 defp increase_replies_count_if_reply(_create_data), do: :noop
95 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
97 def persist(%{"type" => type} = object, meta) when type in @object_types do
98 with {:ok, object} <- Object.create(object) do
104 def persist(object, meta) do
105 with local <- Keyword.fetch!(meta, :local),
106 {recipients, _, _} <- get_recipients(object),
108 Repo.insert(%Activity{
111 recipients: recipients,
112 actor: object["actor"]
114 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
115 {:ok, _} <- maybe_create_activity_expiration(activity) do
116 {:ok, activity, meta}
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
125 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map),
131 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
132 # Splice in the child object if we have one.
133 activity = Maps.put_if_present(activity, :object, object)
135 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
136 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
141 %Activity{} = activity ->
147 {:containment, _} = error ->
150 {:error, _} = error ->
153 {:fake, true, map, recipients} ->
154 activity = %Activity{
158 recipients: recipients,
162 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
165 {:remote_limit_pass, _} ->
166 {:error, :remote_limit}
173 defp insert_activity_with_expiration(data, local, recipients) do
177 actor: data["actor"],
178 recipients: recipients
181 with {:ok, activity} <- Repo.insert(struct) do
182 maybe_create_activity_expiration(activity)
186 def notify_and_stream(activity) do
187 Notification.create_notifications(activity)
189 conversation = create_or_bump_conversation(activity, activity.actor)
190 participations = get_participations(conversation)
192 stream_out_participations(participations)
195 defp maybe_create_activity_expiration(
196 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
199 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
200 activity_id: activity.id,
201 expires_at: expires_at
207 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
209 defp create_or_bump_conversation(activity, actor) do
210 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
211 %User{} = user <- User.get_cached_by_ap_id(actor) do
212 Participation.mark_as_read(user, conversation)
217 defp get_participations({:ok, conversation}) do
219 |> Repo.preload(:participations, force: true)
220 |> Map.get(:participations)
223 defp get_participations(_), do: []
225 def stream_out_participations(participations) do
228 |> Repo.preload(:user)
230 Streamer.stream("participation", participations)
234 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
235 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
236 conversation = Repo.preload(conversation, :participations)
239 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
244 if last_activity_id do
245 stream_out_participations(conversation.participations)
251 def stream_out_participations(_, _), do: :noop
254 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
255 when data_type in ["Create", "Announce", "Delete"] do
257 |> Topics.get_activity_topics()
258 |> Streamer.stream(activity)
262 def stream_out(_activity) do
266 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
267 def create(params, fake \\ false) do
268 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
273 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
274 additional = params[:additional] || %{}
275 # only accept false as false value
276 local = !(params[:local] == false)
277 published = params[:published]
278 quick_insert? = Config.get([:env]) == :benchmark
282 %{to: to, actor: actor, published: published, context: context, object: object},
286 with {:ok, activity} <- insert(create_data, local, fake),
287 {:fake, false, activity} <- {:fake, fake, activity},
288 _ <- increase_replies_count_if_reply(create_data),
289 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
290 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
291 _ <- notify_and_stream(activity),
292 :ok <- maybe_schedule_poll_notifications(activity),
293 :ok <- maybe_federate(activity) do
296 {:quick_insert, true, activity} ->
299 {:fake, true, activity} ->
303 Repo.rollback(message)
307 defp maybe_schedule_poll_notifications(activity) do
308 PollWorker.schedule_poll_end(activity)
312 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
313 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
314 additional = params[:additional] || %{}
315 # only accept false as false value
316 local = !(params[:local] == false)
317 published = params[:published]
321 %{to: to, actor: actor, published: published, context: context, object: object},
325 with {:ok, activity} <- insert(listen_data, local),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_federate(activity) do
332 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
333 {:ok, Activity.t()} | nil | {:error, any()}
334 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
335 with {:ok, result} <-
336 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
341 defp do_unfollow(follower, followed, activity_id, local) do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
344 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
345 {:ok, activity} <- insert(unfollow_data, local),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
351 {:error, error} -> Repo.rollback(error)
355 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
357 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
371 # only accept false as false value
372 local = !(params[:local] == false)
373 forward = !(params[:forward] == false)
375 additional = params[:additional] || %{}
379 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
381 Map.merge(additional, %{"to" => [], "cc" => []})
384 with flag_data <- make_flag_data(params, additional),
385 {:ok, activity} <- insert(flag_data, local),
386 {:ok, stripped_activity} <- strip_report_status_data(activity),
387 _ <- notify_and_stream(activity),
389 maybe_federate(stripped_activity) do
390 User.all_superusers()
391 |> Enum.filter(fn user -> user.ap_id != actor end)
392 |> Enum.filter(fn user -> not is_nil(user.email) end)
393 |> Enum.each(fn superuser ->
395 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
396 |> Pleroma.Emails.Mailer.deliver_async()
401 {:error, error} -> Repo.rollback(error)
405 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
406 def move(%User{} = origin, %User{} = target, local \\ true) do
409 "actor" => origin.ap_id,
410 "object" => origin.ap_id,
411 "target" => target.ap_id
414 with true <- origin.ap_id in target.also_known_as,
415 {:ok, activity} <- insert(params, local),
416 _ <- notify_and_stream(activity) do
417 maybe_federate(activity)
419 BackgroundWorker.enqueue("move_following", %{
420 "origin_id" => origin.id,
421 "target_id" => target.id
426 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
431 def fetch_activities_for_context_query(context, opts) do
432 public = [Constants.as_public()]
436 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
439 from(activity in Activity)
440 |> maybe_preload_objects(opts)
441 |> maybe_preload_bookmarks(opts)
442 |> maybe_set_thread_muted_field(opts)
443 |> restrict_blocked(opts)
444 |> restrict_blockers_visibility(opts)
445 |> restrict_recipients(recipients, opts[:user])
446 |> restrict_filtered(opts)
450 "?->>'type' = ? and ?->>'context' = ?",
457 |> exclude_poll_votes(opts)
459 |> order_by([activity], desc: activity.id)
462 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
463 def fetch_activities_for_context(context, opts \\ %{}) do
465 |> fetch_activities_for_context_query(opts)
469 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
470 FlakeId.Ecto.CompatType.t() | nil
471 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
473 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
474 |> restrict_visibility(%{visibility: "direct"})
480 defp fetch_paginated_optimized(query, opts, pagination) do
481 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
482 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
483 opts = Map.put(opts, :skip_extra_order, true)
485 Pagination.fetch_paginated(query, opts, pagination)
488 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
489 list_memberships = Pleroma.List.memberships(opts[:user])
491 fetch_activities_query(recipients ++ list_memberships, opts)
492 |> fetch_paginated_optimized(opts, pagination)
494 |> maybe_update_cc(list_memberships, opts[:user])
497 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
498 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
499 opts = Map.delete(opts, :user)
501 [Constants.as_public()]
502 |> fetch_activities_query(opts)
503 |> restrict_unlisted(opts)
504 |> fetch_paginated_optimized(opts, pagination)
507 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
508 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
510 |> Map.put(:restrict_unlisted, true)
511 |> fetch_public_or_unlisted_activities(pagination)
514 @valid_visibilities ~w[direct unlisted public private]
516 defp restrict_visibility(query, %{visibility: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 "activity_visibility(?, ?, ?) = ANY (?)",
531 Logger.error("Could not restrict visibility to #{visibility}")
535 defp restrict_visibility(query, %{visibility: visibility})
536 when visibility in @valid_visibilities do
540 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
544 defp restrict_visibility(_query, %{visibility: visibility})
545 when visibility not in @valid_visibilities do
546 Logger.error("Could not restrict visibility to #{visibility}")
549 defp restrict_visibility(query, _visibility), do: query
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when is_list(visibility) do
553 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
558 "activity_visibility(?, ?, ?) = ANY (?)",
566 Logger.error("Could not exclude visibility to #{visibility}")
571 defp exclude_visibility(query, %{exclude_visibilities: visibility})
572 when visibility in @valid_visibilities do
577 "activity_visibility(?, ?, ?) = ?",
586 defp exclude_visibility(query, %{exclude_visibilities: visibility})
587 when visibility not in [nil | @valid_visibilities] do
588 Logger.error("Could not exclude visibility to #{visibility}")
592 defp exclude_visibility(query, _visibility), do: query
594 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
597 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
600 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
603 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
607 defp restrict_thread_visibility(query, _, _), do: query
609 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
612 |> Map.put(:user, reading_user)
613 |> Map.put(:actor_id, user.ap_id)
616 godmode: params[:godmode],
617 reading_user: reading_user
619 |> user_activities_recipients()
620 |> fetch_activities(params)
624 def fetch_user_activities(user, reading_user, params \\ %{})
626 def fetch_user_activities(user, reading_user, %{total: true} = params) do
627 result = fetch_activities_for_user(user, reading_user, params)
629 Keyword.put(result, :items, Enum.reverse(result[:items]))
632 def fetch_user_activities(user, reading_user, params) do
634 |> fetch_activities_for_user(reading_user, params)
638 defp fetch_activities_for_user(user, reading_user, params) do
641 |> Map.put(:type, ["Create", "Announce"])
642 |> Map.put(:user, reading_user)
643 |> Map.put(:actor_id, user.ap_id)
644 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
647 if User.blocks?(reading_user, user) do
651 |> Map.put(:blocking_user, reading_user)
652 |> Map.put(:muting_user, reading_user)
655 pagination_type = Map.get(params, :pagination_type) || :keyset
658 godmode: params[:godmode],
659 reading_user: reading_user
661 |> user_activities_recipients()
662 |> fetch_activities(params, pagination_type)
665 def fetch_statuses(reading_user, %{total: true} = params) do
666 result = fetch_activities_for_reading_user(reading_user, params)
667 Keyword.put(result, :items, Enum.reverse(result[:items]))
670 def fetch_statuses(reading_user, params) do
672 |> fetch_activities_for_reading_user(params)
676 defp fetch_activities_for_reading_user(reading_user, params) do
677 params = Map.put(params, :type, ["Create", "Announce"])
680 godmode: params[:godmode],
681 reading_user: reading_user
683 |> user_activities_recipients()
684 |> fetch_activities(params, :offset)
687 defp user_activities_recipients(%{godmode: true}), do: []
689 defp user_activities_recipients(%{reading_user: reading_user}) do
691 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
693 [Constants.as_public()]
697 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
698 raise "Can't use the child object without preloading!"
701 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
703 [activity, object] in query,
706 "?->>'type' != ? or ?->>'actor' != ?",
715 defp restrict_announce_object_actor(query, _), do: query
717 defp restrict_since(query, %{since_id: ""}), do: query
719 defp restrict_since(query, %{since_id: since_id}) do
720 from(activity in query, where: activity.id > ^since_id)
723 defp restrict_since(query, _), do: query
725 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
726 raise_on_missing_preload()
729 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
731 [_activity, object] in query,
732 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
736 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
737 restrict_embedded_tag_any(query, %{tag: tag})
740 defp restrict_embedded_tag_all(query, _), do: query
742 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
743 raise_on_missing_preload()
746 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
748 [_activity, object] in query,
749 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
753 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
754 restrict_embedded_tag_any(query, %{tag: [tag]})
757 defp restrict_embedded_tag_any(query, _), do: query
759 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
760 raise_on_missing_preload()
763 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
765 [_activity, object] in query,
766 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
770 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
771 when is_binary(tag_reject) do
772 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
775 defp restrict_embedded_tag_reject_any(query, _), do: query
777 defp object_ids_query_for_tags(tags) do
778 from(hto in "hashtags_objects")
779 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
780 |> where([hto, ht], ht.name in ^tags)
781 |> select([hto], hto.object_id)
782 |> distinct([hto], true)
785 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
786 raise_on_missing_preload()
789 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
790 restrict_hashtag_any(query, %{tag: single_tag})
793 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
795 [_activity, object] in query,
799 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
800 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
801 AND hashtags_objects.object_id = ?) @> ?
810 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
811 restrict_hashtag_all(query, %{tag_all: [tag]})
814 defp restrict_hashtag_all(query, _), do: query
816 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
817 raise_on_missing_preload()
820 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
822 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
825 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
827 [_activity, object] in query,
828 join: hto in "hashtags_objects",
829 on: hto.object_id == object.id,
830 where: hto.hashtag_id in ^hashtag_ids,
831 distinct: [desc: object.id],
832 order_by: [desc: object.id]
836 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
837 restrict_hashtag_any(query, %{tag: [tag]})
840 defp restrict_hashtag_any(query, _), do: query
842 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
843 raise_on_missing_preload()
846 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
848 [_activity, object] in query,
849 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
853 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
854 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
857 defp restrict_hashtag_reject_any(query, _), do: query
859 defp raise_on_missing_preload do
860 raise "Can't use the child object without preloading!"
863 defp restrict_recipients(query, [], _user), do: query
865 defp restrict_recipients(query, recipients, nil) do
866 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
869 defp restrict_recipients(query, recipients, user) do
872 where: fragment("? && ?", ^recipients, activity.recipients),
873 or_where: activity.actor == ^user.ap_id
877 defp restrict_local(query, %{local_only: true}) do
878 from(activity in query, where: activity.local == true)
881 defp restrict_local(query, _), do: query
883 defp restrict_remote(query, %{remote: true}) do
884 from(activity in query, where: activity.local == false)
887 defp restrict_remote(query, _), do: query
889 defp restrict_actor(query, %{actor_id: actor_id}) do
890 from(activity in query, where: activity.actor == ^actor_id)
893 defp restrict_actor(query, _), do: query
895 defp restrict_type(query, %{type: type}) when is_binary(type) do
896 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
899 defp restrict_type(query, %{type: type}) do
900 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
903 defp restrict_type(query, _), do: query
905 defp restrict_state(query, %{state: state}) do
906 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
909 defp restrict_state(query, _), do: query
911 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
913 [_activity, object] in query,
914 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
918 defp restrict_favorited_by(query, _), do: query
920 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
921 raise "Can't use the child object without preloading!"
924 defp restrict_media(query, %{only_media: true}) do
926 [activity, object] in query,
927 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
928 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
932 defp restrict_media(query, _), do: query
934 defp restrict_replies(query, %{exclude_replies: true}) do
936 [_activity, object] in query,
937 where: fragment("?->>'inReplyTo' is null", object.data)
941 defp restrict_replies(query, %{
942 reply_filtering_user: %User{} = user,
943 reply_visibility: "self"
946 [activity, object] in query,
949 "?->>'inReplyTo' is null OR ? = ANY(?)",
957 defp restrict_replies(query, %{
958 reply_filtering_user: %User{} = user,
959 reply_visibility: "following"
962 [activity, object] in query,
966 ?->>'type' != 'Create' -- This isn't a Create
967 OR ?->>'inReplyTo' is null -- this isn't a reply
968 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
969 -- unless they are the author (because authors
970 -- are also part of the recipients). This leads
971 -- to a bug that self-replies by friends won't
973 OR ? = ? -- The actor is us
977 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
986 defp restrict_replies(query, _), do: query
988 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
989 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
992 defp restrict_reblogs(query, _), do: query
994 defp restrict_muted(query, %{with_muted: true}), do: query
996 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
997 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1000 from([activity] in query,
1001 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1004 "not (?->'to' \\?| ?) or ? = ?",
1012 unless opts[:skip_preload] do
1013 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1019 defp restrict_muted(query, _), do: query
1021 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1022 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1023 domain_blocks = user.domain_blocks || []
1025 following_ap_ids = User.get_friends_ap_ids(user)
1028 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1031 [activity, object: o] in query,
1032 # You don't block the author
1033 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1035 # You don't block any recipients, and didn't author the post
1038 "((not (? && ?)) or ? = ?)",
1039 activity.recipients,
1045 # You don't block the domain of any recipients, and didn't author the post
1048 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1049 activity.recipients,
1055 # It's not a boost of a user you block
1058 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1064 # You don't block the author's domain, and also don't follow the author
1067 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1074 # Same as above, but checks the Object
1077 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1086 defp restrict_blocked(query, _), do: query
1088 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1089 if Config.get([:activitypub, :blockers_visible]) == true do
1092 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1096 # The author doesn't block you
1097 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1099 # It's not a boost of a user that blocks you
1102 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1111 defp restrict_blockers_visibility(query, _), do: query
1113 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1118 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1120 ^[Constants.as_public()]
1125 defp restrict_unlisted(query, _), do: query
1127 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1129 [activity, object: o] in query,
1132 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1141 defp restrict_pinned(query, _), do: query
1143 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1144 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1150 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1158 defp restrict_muted_reblogs(query, _), do: query
1160 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1163 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1167 defp restrict_instance(query, _), do: query
1169 defp restrict_filtered(query, %{user: %User{} = user}) do
1170 case Filter.compose_regex(user) do
1175 from([activity, object] in query,
1177 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1178 activity.actor == ^user.ap_id
1183 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1184 restrict_filtered(query, %{user: user})
1187 defp restrict_filtered(query, _), do: query
1189 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1191 defp exclude_poll_votes(query, _) do
1192 if has_named_binding?(query, :object) do
1193 from([activity, object: o] in query,
1194 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1201 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1203 defp exclude_chat_messages(query, _) do
1204 if has_named_binding?(query, :object) do
1205 from([activity, object: o] in query,
1206 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1213 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1215 defp exclude_invisible_actors(query, _opts) do
1217 User.Query.build(%{invisible: true, select: [:ap_id]})
1219 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1221 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1224 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1225 from(activity in query, where: activity.id != ^id)
1228 defp exclude_id(query, _), do: query
1230 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1232 defp maybe_preload_objects(query, _) do
1234 |> Activity.with_preloaded_object()
1237 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1239 defp maybe_preload_bookmarks(query, opts) do
1241 |> Activity.with_preloaded_bookmark(opts[:user])
1244 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1246 |> Activity.with_preloaded_report_notes()
1249 defp maybe_preload_report_notes(query, _), do: query
1251 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1253 defp maybe_set_thread_muted_field(query, opts) do
1255 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1258 defp maybe_order(query, %{order: :desc}) do
1260 |> order_by(desc: :id)
1263 defp maybe_order(query, %{order: :asc}) do
1265 |> order_by(asc: :id)
1268 defp maybe_order(query, _), do: query
1270 defp normalize_fetch_activities_query_opts(opts) do
1271 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1273 value when is_bitstring(value) ->
1274 Map.put(opts, key, Hashtag.normalize_name(value))
1276 value when is_list(value) ->
1279 |> Enum.map(&Hashtag.normalize_name/1)
1282 Map.put(opts, key, normalized_value)
1290 defp fetch_activities_query_ap_ids_ops(opts) do
1291 source_user = opts[:muting_user]
1292 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1294 ap_id_relationships =
1295 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1296 [:block | ap_id_relationships]
1301 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1303 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1304 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1306 restrict_muted_reblogs_opts =
1307 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1309 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1312 def fetch_activities_query(recipients, opts \\ %{}) do
1313 opts = normalize_fetch_activities_query_opts(opts)
1315 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1316 fetch_activities_query_ap_ids_ops(opts)
1319 skip_thread_containment: true
1324 |> maybe_preload_objects(opts)
1325 |> maybe_preload_bookmarks(opts)
1326 |> maybe_preload_report_notes(opts)
1327 |> maybe_set_thread_muted_field(opts)
1328 |> maybe_order(opts)
1329 |> restrict_recipients(recipients, opts[:user])
1330 |> restrict_replies(opts)
1331 |> restrict_since(opts)
1332 |> restrict_local(opts)
1333 |> restrict_remote(opts)
1334 |> restrict_actor(opts)
1335 |> restrict_type(opts)
1336 |> restrict_state(opts)
1337 |> restrict_favorited_by(opts)
1338 |> restrict_blocked(restrict_blocked_opts)
1339 |> restrict_blockers_visibility(opts)
1340 |> restrict_muted(restrict_muted_opts)
1341 |> restrict_filtered(opts)
1342 |> restrict_media(opts)
1343 |> restrict_visibility(opts)
1344 |> restrict_thread_visibility(opts, config)
1345 |> restrict_reblogs(opts)
1346 |> restrict_pinned(opts)
1347 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1348 |> restrict_instance(opts)
1349 |> restrict_announce_object_actor(opts)
1350 |> restrict_filtered(opts)
1351 |> Activity.restrict_deactivated_users()
1352 |> exclude_poll_votes(opts)
1353 |> exclude_chat_messages(opts)
1354 |> exclude_invisible_actors(opts)
1355 |> exclude_visibility(opts)
1357 if Config.feature_enabled?(:improved_hashtag_timeline) do
1359 |> restrict_hashtag_any(opts)
1360 |> restrict_hashtag_all(opts)
1361 |> restrict_hashtag_reject_any(opts)
1364 |> restrict_embedded_tag_any(opts)
1365 |> restrict_embedded_tag_all(opts)
1366 |> restrict_embedded_tag_reject_any(opts)
1371 Fetch favorites activities of user with order by sort adds to favorites
1373 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1374 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1376 |> Activity.Queries.by_actor()
1377 |> Activity.Queries.by_type("Like")
1378 |> Activity.with_joined_object()
1379 |> Object.with_joined_activity()
1380 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1381 |> order_by([like, _, _], desc_nulls_last: like.id)
1382 |> Pagination.fetch_paginated(
1383 Map.merge(params, %{skip_order: true}),
1388 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1389 Enum.map(activities, fn
1390 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1391 if Enum.any?(bcc, &(&1 in list_memberships)) do
1392 update_in(activity.data["cc"], &[user_ap_id | &1])
1402 defp maybe_update_cc(activities, _, _), do: activities
1404 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1405 from(activity in query,
1407 fragment("? && ?", activity.recipients, ^recipients) or
1408 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1409 ^Constants.as_public() in activity.recipients)
1413 def fetch_activities_bounded(
1415 recipients_with_public,
1417 pagination \\ :keyset
1419 fetch_activities_query([], opts)
1420 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1421 |> Pagination.fetch_paginated(opts, pagination)
1425 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1426 def upload(file, opts \\ []) do
1427 with {:ok, data} <- Upload.store(file, opts) do
1428 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1430 Repo.insert(%Object{data: obj_data})
1434 @spec get_actor_url(any()) :: binary() | nil
1435 defp get_actor_url(url) when is_binary(url), do: url
1436 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1438 defp get_actor_url(url) when is_list(url) do
1444 defp get_actor_url(_url), do: nil
1446 defp normalize_image(%{"url" => url}) do
1449 "url" => [%{"href" => url}]
1453 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1454 defp normalize_image(_), do: nil
1456 defp object_to_user_data(data) do
1459 |> Map.get("attachment", [])
1460 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1461 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1465 |> Map.get("tag", [])
1467 %{"type" => "Emoji"} -> true
1470 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1471 {String.trim(name, ":"), url}
1474 is_locked = data["manuallyApprovesFollowers"] || false
1475 capabilities = data["capabilities"] || %{}
1476 accepts_chat_messages = capabilities["acceptsChatMessages"]
1477 data = Transmogrifier.maybe_fix_user_object(data)
1478 is_discoverable = data["discoverable"] || false
1479 invisible = data["invisible"] || false
1480 actor_type = data["type"] || "Person"
1482 featured_address = data["featured"]
1483 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1486 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1487 data["publicKey"]["publicKeyPem"]
1493 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1494 data["endpoints"]["sharedInbox"]
1501 uri: get_actor_url(data["url"]),
1503 banner: normalize_image(data["image"]),
1506 is_locked: is_locked,
1507 is_discoverable: is_discoverable,
1508 invisible: invisible,
1509 avatar: normalize_image(data["icon"]),
1511 follower_address: data["followers"],
1512 following_address: data["following"],
1513 featured_address: featured_address,
1514 bio: data["summary"] || "",
1515 actor_type: actor_type,
1516 also_known_as: Map.get(data, "alsoKnownAs", []),
1517 public_key: public_key,
1518 inbox: data["inbox"],
1519 shared_inbox: shared_inbox,
1520 accepts_chat_messages: accepts_chat_messages,
1521 pinned_objects: pinned_objects
1524 # nickname can be nil because of virtual actors
1525 if data["preferredUsername"] do
1529 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1532 Map.put(user_data, :nickname, nil)
1536 def fetch_follow_information_for_user(user) do
1537 with {:ok, following_data} <-
1538 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1539 {:ok, hide_follows} <- collection_private(following_data),
1540 {:ok, followers_data} <-
1541 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1542 {:ok, hide_followers} <- collection_private(followers_data) do
1545 hide_follows: hide_follows,
1546 follower_count: normalize_counter(followers_data["totalItems"]),
1547 following_count: normalize_counter(following_data["totalItems"]),
1548 hide_followers: hide_followers
1551 {:error, _} = e -> e
1556 defp normalize_counter(counter) when is_integer(counter), do: counter
1557 defp normalize_counter(_), do: 0
1559 def maybe_update_follow_information(user_data) do
1560 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1561 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1563 {:collections_available,
1564 !!(user_data[:following_address] && user_data[:follower_address])},
1566 fetch_follow_information_for_user(user_data) do
1567 info = Map.merge(user_data[:info] || %{}, info)
1570 |> Map.put(:info, info)
1572 {:user_type_check, false} ->
1575 {:collections_available, false} ->
1578 {:enabled, false} ->
1583 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1590 defp collection_private(%{"first" => %{"type" => type}})
1591 when type in ["CollectionPage", "OrderedCollectionPage"],
1594 defp collection_private(%{"first" => first}) do
1595 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1596 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1599 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1600 {:error, _} = e -> e
1605 defp collection_private(_data), do: {:ok, true}
1607 def user_data_from_user_object(data) do
1608 with {:ok, data} <- MRF.filter(data) do
1609 {:ok, object_to_user_data(data)}
1615 def fetch_and_prepare_user_from_ap_id(ap_id) do
1616 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1617 {:ok, data} <- user_data_from_user_object(data) do
1618 {:ok, maybe_update_follow_information(data)}
1620 # If this has been deleted, only log a debug and not an error
1621 {:error, "Object has been deleted" = e} ->
1622 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1625 {:error, {:reject, reason} = e} ->
1626 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1630 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1635 def maybe_handle_clashing_nickname(data) do
1636 with nickname when is_binary(nickname) <- data[:nickname],
1637 %User{} = old_user <- User.get_by_nickname(nickname),
1638 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1640 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1644 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1645 |> User.update_and_set_cache()
1647 {:ap_id_comparison, true} ->
1649 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1657 def pin_data_from_featured_collection(%{
1659 "orderedItems" => objects
1661 when type in ["OrderedCollection", "Collection"] do
1662 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1665 def fetch_and_prepare_featured_from_ap_id(nil) do
1669 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1670 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1671 {:ok, pin_data_from_featured_collection(data)}
1674 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1679 def pinned_fetch_task(nil), do: nil
1681 def pinned_fetch_task(%{pinned_objects: pins}) do
1682 if Enum.all?(pins, fn {ap_id, _} ->
1683 Object.get_cached_by_ap_id(ap_id) ||
1684 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1692 def make_user_from_ap_id(ap_id) do
1693 user = User.get_cached_by_ap_id(ap_id)
1695 if user && !User.ap_enabled?(user) do
1696 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1698 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1699 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1703 |> User.remote_user_changeset(data)
1704 |> User.update_and_set_cache()
1706 maybe_handle_clashing_nickname(data)
1709 |> User.remote_user_changeset()
1717 def make_user_from_nickname(nickname) do
1718 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1719 make_user_from_ap_id(ap_id)
1721 _e -> {:error, "No AP id in WebFinger"}
1725 # filter out broken threads
1726 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1727 entire_thread_visible_for_user?(activity, user)
1730 # do post-processing on a specific activity
1731 def contain_activity(%Activity{} = activity, %User{} = user) do
1732 contain_broken_threads(activity, user)
1735 def fetch_direct_messages_query do
1737 |> restrict_type(%{type: "Create"})
1738 |> restrict_visibility(%{visibility: "direct"})
1739 |> order_by([activity], asc: activity.id)