1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
35 require Pleroma.Constants
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
67 defp check_actor_can_insert(_), do: true
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
74 defp check_remote_limit(_), do: true
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
84 def update_last_status_at_if_public(actor, object) do
85 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
88 defp increase_replies_count_if_reply(%{
89 "object" => %{"inReplyTo" => reply_ap_id} = object,
92 if is_public?(object) do
93 Object.increase_replies_count(reply_ap_id)
97 defp increase_replies_count_if_reply(_create_data), do: :noop
99 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
101 def persist(%{"type" => type} = object, meta) when type in @object_types do
102 with {:ok, object} <- Object.create(object) do
108 def persist(object, meta) do
109 with local <- Keyword.fetch!(meta, :local),
110 {recipients, _, _} <- get_recipients(object),
112 Repo.insert(%Activity{
115 recipients: recipients,
116 actor: object["actor"]
118 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
119 {:ok, _} <- maybe_create_activity_expiration(activity) do
120 {:ok, activity, meta}
124 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
125 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
129 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
134 {:ok, map, object} <- insert_full_object(map),
135 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
136 # Splice in the child object if we have one.
137 activity = Maps.put_if_present(activity, :object, object)
139 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
140 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
145 %Activity{} = activity ->
151 {:containment, _} = error ->
154 {:error, _} = error ->
157 {:fake, true, map, recipients} ->
158 activity = %Activity{
162 recipients: recipients,
166 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
169 {:remote_limit_pass, _} ->
170 {:error, :remote_limit}
177 defp insert_activity_with_expiration(data, local, recipients) do
181 actor: data["actor"],
182 recipients: recipients
185 with {:ok, activity} <- Repo.insert(struct) do
186 maybe_create_activity_expiration(activity)
190 def notify_and_stream(activity) do
191 Notification.create_notifications(activity)
193 conversation = create_or_bump_conversation(activity, activity.actor)
194 participations = get_participations(conversation)
196 stream_out_participations(participations)
199 defp maybe_create_activity_expiration(
200 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
203 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
204 activity_id: activity.id,
205 expires_at: expires_at
211 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
213 defp create_or_bump_conversation(activity, actor) do
214 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
215 %User{} = user <- User.get_cached_by_ap_id(actor) do
216 Participation.mark_as_read(user, conversation)
221 defp get_participations({:ok, conversation}) do
223 |> Repo.preload(:participations, force: true)
224 |> Map.get(:participations)
227 defp get_participations(_), do: []
229 def stream_out_participations(participations) do
232 |> Repo.preload(:user)
234 Streamer.stream("participation", participations)
238 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
239 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
240 conversation = Repo.preload(conversation, :participations)
243 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
248 if last_activity_id do
249 stream_out_participations(conversation.participations)
255 def stream_out_participations(_, _), do: :noop
258 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
259 when data_type in ["Create", "Announce", "Delete"] do
261 |> Topics.get_activity_topics()
262 |> Streamer.stream(activity)
266 def stream_out(_activity) do
270 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
271 def create(params, fake \\ false) do
272 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
277 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
278 additional = params[:additional] || %{}
279 # only accept false as false value
280 local = !(params[:local] == false)
281 published = params[:published]
282 quick_insert? = Config.get([:env]) == :benchmark
286 %{to: to, actor: actor, published: published, context: context, object: object},
290 with {:ok, activity} <- insert(create_data, local, fake),
291 {:fake, false, activity} <- {:fake, fake, activity},
292 _ <- increase_replies_count_if_reply(create_data),
293 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
294 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
295 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
296 _ <- notify_and_stream(activity),
297 :ok <- maybe_schedule_poll_notifications(activity),
298 :ok <- maybe_federate(activity) do
301 {:quick_insert, true, activity} ->
304 {:fake, true, activity} ->
308 Repo.rollback(message)
312 defp maybe_schedule_poll_notifications(activity) do
313 PollWorker.schedule_poll_end(activity)
317 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
318 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
319 additional = params[:additional] || %{}
320 # only accept false as false value
321 local = !(params[:local] == false)
322 published = params[:published]
326 %{to: to, actor: actor, published: published, context: context, object: object},
330 with {:ok, activity} <- insert(listen_data, local),
331 _ <- notify_and_stream(activity),
332 :ok <- maybe_federate(activity) do
337 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
338 {:ok, Activity.t()} | nil | {:error, any()}
339 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
340 with {:ok, result} <-
341 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
346 defp do_unfollow(follower, followed, activity_id, local) do
347 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
348 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
349 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
350 {:ok, activity} <- insert(unfollow_data, local),
351 _ <- notify_and_stream(activity),
352 :ok <- maybe_federate(activity) do
356 {:error, error} -> Repo.rollback(error)
360 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
362 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
376 # only accept false as false value
377 local = !(params[:local] == false)
378 forward = !(params[:forward] == false)
380 additional = params[:additional] || %{}
384 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
386 Map.merge(additional, %{"to" => [], "cc" => []})
389 with flag_data <- make_flag_data(params, additional),
390 {:ok, activity} <- insert(flag_data, local),
391 {:ok, stripped_activity} <- strip_report_status_data(activity),
392 _ <- notify_and_stream(activity),
394 maybe_federate(stripped_activity) do
395 User.all_superusers()
396 |> Enum.filter(fn user -> user.ap_id != actor end)
397 |> Enum.filter(fn user -> not is_nil(user.email) end)
398 |> Enum.each(fn superuser ->
400 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
401 |> Pleroma.Emails.Mailer.deliver_async()
406 {:error, error} -> Repo.rollback(error)
410 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
411 def move(%User{} = origin, %User{} = target, local \\ true) do
414 "actor" => origin.ap_id,
415 "object" => origin.ap_id,
416 "target" => target.ap_id
419 with true <- origin.ap_id in target.also_known_as,
420 {:ok, activity} <- insert(params, local),
421 _ <- notify_and_stream(activity) do
422 maybe_federate(activity)
424 BackgroundWorker.enqueue("move_following", %{
425 "origin_id" => origin.id,
426 "target_id" => target.id
431 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
436 def fetch_activities_for_context_query(context, opts) do
437 public = [Constants.as_public()]
441 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
444 from(activity in Activity)
445 |> maybe_preload_objects(opts)
446 |> maybe_preload_bookmarks(opts)
447 |> maybe_set_thread_muted_field(opts)
448 |> restrict_blocked(opts)
449 |> restrict_blockers_visibility(opts)
450 |> restrict_recipients(recipients, opts[:user])
451 |> restrict_filtered(opts)
455 "?->>'type' = ? and ?->>'context' = ?",
462 |> exclude_poll_votes(opts)
464 |> order_by([activity], desc: activity.id)
467 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
468 def fetch_activities_for_context(context, opts \\ %{}) do
470 |> fetch_activities_for_context_query(opts)
474 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
475 FlakeId.Ecto.CompatType.t() | nil
476 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
478 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
479 |> restrict_visibility(%{visibility: "direct"})
485 defp fetch_paginated_optimized(query, opts, pagination) do
486 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
487 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
488 opts = Map.put(opts, :skip_extra_order, true)
490 Pagination.fetch_paginated(query, opts, pagination)
493 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
494 list_memberships = Pleroma.List.memberships(opts[:user])
496 fetch_activities_query(recipients ++ list_memberships, opts)
497 |> fetch_paginated_optimized(opts, pagination)
499 |> maybe_update_cc(list_memberships, opts[:user])
502 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
503 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
504 opts = Map.delete(opts, :user)
506 [Constants.as_public()]
507 |> fetch_activities_query(opts)
508 |> restrict_unlisted(opts)
509 |> fetch_paginated_optimized(opts, pagination)
512 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
513 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
515 |> Map.put(:restrict_unlisted, true)
516 |> fetch_public_or_unlisted_activities(pagination)
519 @valid_visibilities ~w[direct unlisted public private]
521 defp restrict_visibility(query, %{visibility: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
528 "activity_visibility(?, ?, ?) = ANY (?)",
536 Logger.error("Could not restrict visibility to #{visibility}")
540 defp restrict_visibility(query, %{visibility: visibility})
541 when visibility in @valid_visibilities do
545 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
549 defp restrict_visibility(_query, %{visibility: visibility})
550 when visibility not in @valid_visibilities do
551 Logger.error("Could not restrict visibility to #{visibility}")
554 defp restrict_visibility(query, _visibility), do: query
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when is_list(visibility) do
558 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
563 "activity_visibility(?, ?, ?) = ANY (?)",
571 Logger.error("Could not exclude visibility to #{visibility}")
576 defp exclude_visibility(query, %{exclude_visibilities: visibility})
577 when visibility in @valid_visibilities do
582 "activity_visibility(?, ?, ?) = ?",
591 defp exclude_visibility(query, %{exclude_visibilities: visibility})
592 when visibility not in [nil | @valid_visibilities] do
593 Logger.error("Could not exclude visibility to #{visibility}")
597 defp exclude_visibility(query, _visibility), do: query
599 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
602 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
605 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
608 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
612 defp restrict_thread_visibility(query, _, _), do: query
614 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
617 |> Map.put(:user, reading_user)
618 |> Map.put(:actor_id, user.ap_id)
621 godmode: params[:godmode],
622 reading_user: reading_user
624 |> user_activities_recipients()
625 |> fetch_activities(params)
629 def fetch_user_activities(user, reading_user, params \\ %{})
631 def fetch_user_activities(user, reading_user, %{total: true} = params) do
632 result = fetch_activities_for_user(user, reading_user, params)
634 Keyword.put(result, :items, Enum.reverse(result[:items]))
637 def fetch_user_activities(user, reading_user, params) do
639 |> fetch_activities_for_user(reading_user, params)
643 defp fetch_activities_for_user(user, reading_user, params) do
646 |> Map.put(:type, ["Create", "Announce"])
647 |> Map.put(:user, reading_user)
648 |> Map.put(:actor_id, user.ap_id)
649 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
652 if User.blocks?(reading_user, user) do
656 |> Map.put(:blocking_user, reading_user)
657 |> Map.put(:muting_user, reading_user)
660 pagination_type = Map.get(params, :pagination_type) || :keyset
663 godmode: params[:godmode],
664 reading_user: reading_user
666 |> user_activities_recipients()
667 |> fetch_activities(params, pagination_type)
670 def fetch_statuses(reading_user, %{total: true} = params) do
671 result = fetch_activities_for_reading_user(reading_user, params)
672 Keyword.put(result, :items, Enum.reverse(result[:items]))
675 def fetch_statuses(reading_user, params) do
677 |> fetch_activities_for_reading_user(params)
681 defp fetch_activities_for_reading_user(reading_user, params) do
682 params = Map.put(params, :type, ["Create", "Announce"])
685 godmode: params[:godmode],
686 reading_user: reading_user
688 |> user_activities_recipients()
689 |> fetch_activities(params, :offset)
692 defp user_activities_recipients(%{godmode: true}), do: []
694 defp user_activities_recipients(%{reading_user: reading_user}) do
696 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
698 [Constants.as_public()]
702 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
703 raise "Can't use the child object without preloading!"
706 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
708 [activity, object] in query,
711 "?->>'type' != ? or ?->>'actor' != ?",
720 defp restrict_announce_object_actor(query, _), do: query
722 defp restrict_since(query, %{since_id: ""}), do: query
724 defp restrict_since(query, %{since_id: since_id}) do
725 from(activity in query, where: activity.id > ^since_id)
728 defp restrict_since(query, _), do: query
730 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
731 raise_on_missing_preload()
734 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
736 [_activity, object] in query,
737 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
741 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
742 restrict_embedded_tag_any(query, %{tag: tag})
745 defp restrict_embedded_tag_all(query, _), do: query
747 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
748 raise_on_missing_preload()
751 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
758 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
759 restrict_embedded_tag_any(query, %{tag: [tag]})
762 defp restrict_embedded_tag_any(query, _), do: query
764 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
765 raise_on_missing_preload()
768 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
770 [_activity, object] in query,
771 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
775 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
776 when is_binary(tag_reject) do
777 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
780 defp restrict_embedded_tag_reject_any(query, _), do: query
782 defp object_ids_query_for_tags(tags) do
783 from(hto in "hashtags_objects")
784 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
785 |> where([hto, ht], ht.name in ^tags)
786 |> select([hto], hto.object_id)
787 |> distinct([hto], true)
790 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
791 raise_on_missing_preload()
794 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
795 restrict_hashtag_any(query, %{tag: single_tag})
798 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
800 [_activity, object] in query,
804 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
805 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
806 AND hashtags_objects.object_id = ?) @> ?
815 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
816 restrict_hashtag_all(query, %{tag_all: [tag]})
819 defp restrict_hashtag_all(query, _), do: query
821 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
822 raise_on_missing_preload()
825 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
827 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
830 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
832 [_activity, object] in query,
833 join: hto in "hashtags_objects",
834 on: hto.object_id == object.id,
835 where: hto.hashtag_id in ^hashtag_ids,
836 distinct: [desc: object.id],
837 order_by: [desc: object.id]
841 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
842 restrict_hashtag_any(query, %{tag: [tag]})
845 defp restrict_hashtag_any(query, _), do: query
847 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
848 raise_on_missing_preload()
851 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
853 [_activity, object] in query,
854 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
858 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
859 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
862 defp restrict_hashtag_reject_any(query, _), do: query
864 defp raise_on_missing_preload do
865 raise "Can't use the child object without preloading!"
868 defp restrict_recipients(query, [], _user), do: query
870 defp restrict_recipients(query, recipients, nil) do
871 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
874 defp restrict_recipients(query, recipients, user) do
877 where: fragment("? && ?", ^recipients, activity.recipients),
878 or_where: activity.actor == ^user.ap_id
882 defp restrict_local(query, %{local_only: true}) do
883 from(activity in query, where: activity.local == true)
886 defp restrict_local(query, _), do: query
888 defp restrict_remote(query, %{remote: true}) do
889 from(activity in query, where: activity.local == false)
892 defp restrict_remote(query, _), do: query
894 defp restrict_actor(query, %{actor_id: actor_id}) do
895 from(activity in query, where: activity.actor == ^actor_id)
898 defp restrict_actor(query, _), do: query
900 defp restrict_type(query, %{type: type}) when is_binary(type) do
901 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
904 defp restrict_type(query, %{type: type}) do
905 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
908 defp restrict_type(query, _), do: query
910 defp restrict_state(query, %{state: state}) do
911 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
914 defp restrict_state(query, _), do: query
916 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
918 [_activity, object] in query,
919 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
923 defp restrict_favorited_by(query, _), do: query
925 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
926 raise "Can't use the child object without preloading!"
929 defp restrict_media(query, %{only_media: true}) do
931 [activity, object] in query,
932 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
933 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
937 defp restrict_media(query, _), do: query
939 defp restrict_replies(query, %{exclude_replies: true}) do
941 [_activity, object] in query,
942 where: fragment("?->>'inReplyTo' is null", object.data)
946 defp restrict_replies(query, %{
947 reply_filtering_user: %User{} = user,
948 reply_visibility: "self"
951 [activity, object] in query,
954 "?->>'inReplyTo' is null OR ? = ANY(?)",
962 defp restrict_replies(query, %{
963 reply_filtering_user: %User{} = user,
964 reply_visibility: "following"
967 [activity, object] in query,
971 ?->>'type' != 'Create' -- This isn't a Create
972 OR ?->>'inReplyTo' is null -- this isn't a reply
973 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
974 -- unless they are the author (because authors
975 -- are also part of the recipients). This leads
976 -- to a bug that self-replies by friends won't
978 OR ? = ? -- The actor is us
982 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
991 defp restrict_replies(query, _), do: query
993 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
994 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
997 defp restrict_reblogs(query, _), do: query
999 defp restrict_muted(query, %{with_muted: true}), do: query
1001 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1002 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1005 from([activity] in query,
1006 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1009 "not (?->'to' \\?| ?) or ? = ?",
1017 unless opts[:skip_preload] do
1018 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1024 defp restrict_muted(query, _), do: query
1026 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1027 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1028 domain_blocks = user.domain_blocks || []
1030 following_ap_ids = User.get_friends_ap_ids(user)
1033 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1036 [activity, object: o] in query,
1037 # You don't block the author
1038 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1040 # You don't block any recipients, and didn't author the post
1043 "((not (? && ?)) or ? = ?)",
1044 activity.recipients,
1050 # You don't block the domain of any recipients, and didn't author the post
1053 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1054 activity.recipients,
1060 # It's not a boost of a user you block
1063 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1069 # You don't block the author's domain, and also don't follow the author
1072 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1079 # Same as above, but checks the Object
1082 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1091 defp restrict_blocked(query, _), do: query
1093 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1094 if Config.get([:activitypub, :blockers_visible]) == true do
1097 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1101 # The author doesn't block you
1102 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1104 # It's not a boost of a user that blocks you
1107 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1116 defp restrict_blockers_visibility(query, _), do: query
1118 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1123 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1125 ^[Constants.as_public()]
1130 defp restrict_unlisted(query, _), do: query
1132 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1134 [activity, object: o] in query,
1137 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1146 defp restrict_pinned(query, _), do: query
1148 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1149 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1155 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1163 defp restrict_muted_reblogs(query, _), do: query
1165 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1168 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1172 defp restrict_instance(query, _), do: query
1174 defp restrict_filtered(query, %{user: %User{} = user}) do
1175 case Filter.compose_regex(user) do
1180 from([activity, object] in query,
1182 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1183 activity.actor == ^user.ap_id
1188 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1189 restrict_filtered(query, %{user: user})
1192 defp restrict_filtered(query, _), do: query
1194 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1196 defp exclude_poll_votes(query, _) do
1197 if has_named_binding?(query, :object) do
1198 from([activity, object: o] in query,
1199 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1206 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1208 defp exclude_chat_messages(query, _) do
1209 if has_named_binding?(query, :object) do
1210 from([activity, object: o] in query,
1211 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1218 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1220 defp exclude_invisible_actors(query, _opts) do
1222 User.Query.build(%{invisible: true, select: [:ap_id]})
1224 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1226 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1229 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1230 from(activity in query, where: activity.id != ^id)
1233 defp exclude_id(query, _), do: query
1235 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1237 defp maybe_preload_objects(query, _) do
1239 |> Activity.with_preloaded_object()
1242 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1244 defp maybe_preload_bookmarks(query, opts) do
1246 |> Activity.with_preloaded_bookmark(opts[:user])
1249 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1251 |> Activity.with_preloaded_report_notes()
1254 defp maybe_preload_report_notes(query, _), do: query
1256 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1258 defp maybe_set_thread_muted_field(query, opts) do
1260 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1263 defp maybe_order(query, %{order: :desc}) do
1265 |> order_by(desc: :id)
1268 defp maybe_order(query, %{order: :asc}) do
1270 |> order_by(asc: :id)
1273 defp maybe_order(query, _), do: query
1275 defp normalize_fetch_activities_query_opts(opts) do
1276 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1278 value when is_bitstring(value) ->
1279 Map.put(opts, key, Hashtag.normalize_name(value))
1281 value when is_list(value) ->
1284 |> Enum.map(&Hashtag.normalize_name/1)
1287 Map.put(opts, key, normalized_value)
1295 defp fetch_activities_query_ap_ids_ops(opts) do
1296 source_user = opts[:muting_user]
1297 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1299 ap_id_relationships =
1300 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1301 [:block | ap_id_relationships]
1306 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1308 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1309 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1311 restrict_muted_reblogs_opts =
1312 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1314 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1317 def fetch_activities_query(recipients, opts \\ %{}) do
1318 opts = normalize_fetch_activities_query_opts(opts)
1320 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1321 fetch_activities_query_ap_ids_ops(opts)
1324 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1329 |> maybe_preload_objects(opts)
1330 |> maybe_preload_bookmarks(opts)
1331 |> maybe_preload_report_notes(opts)
1332 |> maybe_set_thread_muted_field(opts)
1333 |> maybe_order(opts)
1334 |> restrict_recipients(recipients, opts[:user])
1335 |> restrict_replies(opts)
1336 |> restrict_since(opts)
1337 |> restrict_local(opts)
1338 |> restrict_remote(opts)
1339 |> restrict_actor(opts)
1340 |> restrict_type(opts)
1341 |> restrict_state(opts)
1342 |> restrict_favorited_by(opts)
1343 |> restrict_blocked(restrict_blocked_opts)
1344 |> restrict_blockers_visibility(opts)
1345 |> restrict_muted(restrict_muted_opts)
1346 |> restrict_filtered(opts)
1347 |> restrict_media(opts)
1348 |> restrict_visibility(opts)
1349 |> restrict_thread_visibility(opts, config)
1350 |> restrict_reblogs(opts)
1351 |> restrict_pinned(opts)
1352 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1353 |> restrict_instance(opts)
1354 |> restrict_announce_object_actor(opts)
1355 |> restrict_filtered(opts)
1356 |> Activity.restrict_deactivated_users()
1357 |> exclude_poll_votes(opts)
1358 |> exclude_chat_messages(opts)
1359 |> exclude_invisible_actors(opts)
1360 |> exclude_visibility(opts)
1362 if Config.feature_enabled?(:improved_hashtag_timeline) do
1364 |> restrict_hashtag_any(opts)
1365 |> restrict_hashtag_all(opts)
1366 |> restrict_hashtag_reject_any(opts)
1369 |> restrict_embedded_tag_any(opts)
1370 |> restrict_embedded_tag_all(opts)
1371 |> restrict_embedded_tag_reject_any(opts)
1376 Fetch favorites activities of user with order by sort adds to favorites
1378 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1379 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1381 |> Activity.Queries.by_actor()
1382 |> Activity.Queries.by_type("Like")
1383 |> Activity.with_joined_object()
1384 |> Object.with_joined_activity()
1385 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1386 |> order_by([like, _, _], desc_nulls_last: like.id)
1387 |> Pagination.fetch_paginated(
1388 Map.merge(params, %{skip_order: true}),
1393 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1394 Enum.map(activities, fn
1395 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1396 if Enum.any?(bcc, &(&1 in list_memberships)) do
1397 update_in(activity.data["cc"], &[user_ap_id | &1])
1407 defp maybe_update_cc(activities, _, _), do: activities
1409 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1410 from(activity in query,
1412 fragment("? && ?", activity.recipients, ^recipients) or
1413 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1414 ^Constants.as_public() in activity.recipients)
1418 def fetch_activities_bounded(
1420 recipients_with_public,
1422 pagination \\ :keyset
1424 fetch_activities_query([], opts)
1425 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1426 |> Pagination.fetch_paginated(opts, pagination)
1430 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1431 def upload(file, opts \\ []) do
1432 with {:ok, data} <- Upload.store(file, opts) do
1433 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1435 Repo.insert(%Object{data: obj_data})
1439 @spec get_actor_url(any()) :: binary() | nil
1440 defp get_actor_url(url) when is_binary(url), do: url
1441 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1443 defp get_actor_url(url) when is_list(url) do
1449 defp get_actor_url(_url), do: nil
1451 defp normalize_image(%{"url" => url}) do
1454 "url" => [%{"href" => url}]
1458 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1459 defp normalize_image(_), do: nil
1461 defp object_to_user_data(data) do
1464 |> Map.get("attachment", [])
1465 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1466 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1470 |> Map.get("tag", [])
1472 %{"type" => "Emoji"} -> true
1475 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1476 {String.trim(name, ":"), url}
1479 is_locked = data["manuallyApprovesFollowers"] || false
1480 capabilities = data["capabilities"] || %{}
1481 accepts_chat_messages = capabilities["acceptsChatMessages"]
1482 data = Transmogrifier.maybe_fix_user_object(data)
1483 is_discoverable = data["discoverable"] || false
1484 invisible = data["invisible"] || false
1485 actor_type = data["type"] || "Person"
1487 featured_address = data["featured"]
1488 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1491 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1492 data["publicKey"]["publicKeyPem"]
1498 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1499 data["endpoints"]["sharedInbox"]
1506 uri: get_actor_url(data["url"]),
1508 banner: normalize_image(data["image"]),
1511 is_locked: is_locked,
1512 is_discoverable: is_discoverable,
1513 invisible: invisible,
1514 avatar: normalize_image(data["icon"]),
1516 follower_address: data["followers"],
1517 following_address: data["following"],
1518 featured_address: featured_address,
1519 bio: data["summary"] || "",
1520 actor_type: actor_type,
1521 also_known_as: Map.get(data, "alsoKnownAs", []),
1522 public_key: public_key,
1523 inbox: data["inbox"],
1524 shared_inbox: shared_inbox,
1525 accepts_chat_messages: accepts_chat_messages,
1526 pinned_objects: pinned_objects
1529 # nickname can be nil because of virtual actors
1530 if data["preferredUsername"] do
1534 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1537 Map.put(user_data, :nickname, nil)
1541 def fetch_follow_information_for_user(user) do
1542 with {:ok, following_data} <-
1543 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1544 {:ok, hide_follows} <- collection_private(following_data),
1545 {:ok, followers_data} <-
1546 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1547 {:ok, hide_followers} <- collection_private(followers_data) do
1550 hide_follows: hide_follows,
1551 follower_count: normalize_counter(followers_data["totalItems"]),
1552 following_count: normalize_counter(following_data["totalItems"]),
1553 hide_followers: hide_followers
1556 {:error, _} = e -> e
1561 defp normalize_counter(counter) when is_integer(counter), do: counter
1562 defp normalize_counter(_), do: 0
1564 def maybe_update_follow_information(user_data) do
1565 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1566 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1568 {:collections_available,
1569 !!(user_data[:following_address] && user_data[:follower_address])},
1571 fetch_follow_information_for_user(user_data) do
1572 info = Map.merge(user_data[:info] || %{}, info)
1575 |> Map.put(:info, info)
1577 {:user_type_check, false} ->
1580 {:collections_available, false} ->
1583 {:enabled, false} ->
1588 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1595 defp collection_private(%{"first" => %{"type" => type}})
1596 when type in ["CollectionPage", "OrderedCollectionPage"],
1599 defp collection_private(%{"first" => first}) do
1600 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1601 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1604 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1605 {:error, _} = e -> e
1610 defp collection_private(_data), do: {:ok, true}
1612 def user_data_from_user_object(data) do
1613 with {:ok, data} <- MRF.filter(data) do
1614 {:ok, object_to_user_data(data)}
1620 def fetch_and_prepare_user_from_ap_id(ap_id) do
1621 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1622 {:ok, data} <- user_data_from_user_object(data) do
1623 {:ok, maybe_update_follow_information(data)}
1625 # If this has been deleted, only log a debug and not an error
1626 {:error, "Object has been deleted" = e} ->
1627 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1630 {:error, {:reject, reason} = e} ->
1631 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1635 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1640 def maybe_handle_clashing_nickname(data) do
1641 with nickname when is_binary(nickname) <- data[:nickname],
1642 %User{} = old_user <- User.get_by_nickname(nickname),
1643 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1645 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1649 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1650 |> User.update_and_set_cache()
1652 {:ap_id_comparison, true} ->
1654 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1662 def pin_data_from_featured_collection(%{
1664 "orderedItems" => objects
1666 when type in ["OrderedCollection", "Collection"] do
1667 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1670 def fetch_and_prepare_featured_from_ap_id(nil) do
1674 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1675 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1676 {:ok, pin_data_from_featured_collection(data)}
1679 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1684 def pinned_fetch_task(nil), do: nil
1686 def pinned_fetch_task(%{pinned_objects: pins}) do
1687 if Enum.all?(pins, fn {ap_id, _} ->
1688 Object.get_cached_by_ap_id(ap_id) ||
1689 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1697 def make_user_from_ap_id(ap_id) do
1698 user = User.get_cached_by_ap_id(ap_id)
1700 if user && !User.ap_enabled?(user) do
1701 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1703 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1704 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1708 |> User.remote_user_changeset(data)
1709 |> User.update_and_set_cache()
1711 maybe_handle_clashing_nickname(data)
1714 |> User.remote_user_changeset()
1722 def make_user_from_nickname(nickname) do
1723 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1724 make_user_from_ap_id(ap_id)
1726 _e -> {:error, "No AP id in WebFinger"}
1730 # filter out broken threads
1731 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1732 entire_thread_visible_for_user?(activity, user)
1735 # do post-processing on a specific activity
1736 def contain_activity(%Activity{} = activity, %User{} = user) do
1737 contain_broken_threads(activity, user)
1740 def fetch_direct_messages_query do
1742 |> restrict_type(%{type: "Create"})
1743 |> restrict_visibility(%{visibility: "direct"})
1744 |> order_by([activity], asc: activity.id)