1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
8 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
16 alias Pleroma.Notification
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
36 require Pleroma.Constants
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
68 defp check_actor_can_insert(_), do: true
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
75 defp check_remote_limit(_), do: true
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
98 defp increase_replies_count_if_reply(_create_data), do: :noop
100 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
113 Repo.insert(%Activity{
116 recipients: recipients,
117 actor: object["actor"]
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
149 %Activity{} = activity ->
155 {:containment, _} = error ->
158 {:error, _} = error ->
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
166 recipients: recipients,
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
181 defp insert_activity_with_expiration(data, local, recipients) do
185 actor: data["actor"],
186 recipients: recipients
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
200 stream_out_participations(participations)
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
225 defp get_participations({:ok, conversation}) do
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
231 defp get_participations(_), do: []
233 def stream_out_participations(participations) do
236 |> Repo.preload(:user)
238 Streamer.stream("participation", participations)
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
259 def stream_out_participations(_, _), do: :noop
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
270 def stream_out(_activity) do
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
290 %{to: to, actor: actor, published: published, context: context, object: object},
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
305 {:quick_insert, true, activity} ->
308 {:fake, true, activity} ->
312 Repo.rollback(message)
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
321 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
322 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
323 additional = params[:additional] || %{}
324 # only accept false as false value
325 local = !(params[:local] == false)
326 published = params[:published]
330 %{to: to, actor: actor, published: published, context: context, object: object},
334 with {:ok, activity} <- insert(listen_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
341 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
342 {:ok, Activity.t()} | nil | {:error, any()}
343 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
344 with {:ok, result} <-
345 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
350 defp do_unfollow(follower, followed, activity_id, local) do
351 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
352 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
353 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
354 {:ok, activity} <- insert(unfollow_data, local),
355 _ <- notify_and_stream(activity),
356 :ok <- maybe_federate(activity) do
360 {:error, error} -> Repo.rollback(error)
364 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
366 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
380 # only accept false as false value
381 local = !(params[:local] == false)
382 forward = !(params[:forward] == false)
384 additional = params[:additional] || %{}
388 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
390 Map.merge(additional, %{"to" => [], "cc" => []})
393 with flag_data <- make_flag_data(params, additional),
394 {:ok, activity} <- insert(flag_data, local),
395 {:ok, stripped_activity} <- strip_report_status_data(activity),
396 _ <- notify_and_stream(activity),
398 maybe_federate(stripped_activity) do
399 User.all_superusers()
400 |> Enum.filter(fn user -> user.ap_id != actor end)
401 |> Enum.filter(fn user -> not is_nil(user.email) end)
402 |> Enum.each(fn superuser ->
404 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
405 |> Pleroma.Emails.Mailer.deliver_async()
410 {:error, error} -> Repo.rollback(error)
414 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
415 def move(%User{} = origin, %User{} = target, local \\ true) do
418 "actor" => origin.ap_id,
419 "object" => origin.ap_id,
420 "target" => target.ap_id,
421 "to" => [origin.follower_address]
424 with true <- origin.ap_id in target.also_known_as,
425 {:ok, activity} <- insert(params, local),
426 _ <- notify_and_stream(activity) do
427 maybe_federate(activity)
429 BackgroundWorker.enqueue("move_following", %{
430 "origin_id" => origin.id,
431 "target_id" => target.id
436 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
441 def fetch_activities_for_context_query(context, opts) do
442 public = [Constants.as_public()]
446 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
449 from(activity in Activity)
450 |> maybe_preload_objects(opts)
451 |> maybe_preload_bookmarks(opts)
452 |> maybe_set_thread_muted_field(opts)
453 |> restrict_blocked(opts)
454 |> restrict_blockers_visibility(opts)
455 |> restrict_recipients(recipients, opts[:user])
456 |> restrict_filtered(opts)
460 "?->>'type' = ? and ?->>'context' = ?",
467 |> exclude_poll_votes(opts)
469 |> order_by([activity], desc: activity.id)
472 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
473 def fetch_activities_for_context(context, opts \\ %{}) do
475 |> fetch_activities_for_context_query(opts)
479 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
480 FlakeId.Ecto.CompatType.t() | nil
481 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
483 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
484 |> restrict_visibility(%{visibility: "direct"})
490 defp fetch_paginated_optimized(query, opts, pagination) do
491 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
492 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
493 opts = Map.put(opts, :skip_extra_order, true)
495 Pagination.fetch_paginated(query, opts, pagination)
498 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
499 list_memberships = Pleroma.List.memberships(opts[:user])
501 fetch_activities_query(recipients ++ list_memberships, opts)
502 |> fetch_paginated_optimized(opts, pagination)
504 |> maybe_update_cc(list_memberships, opts[:user])
507 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
508 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
509 opts = Map.delete(opts, :user)
511 [Constants.as_public()]
512 |> fetch_activities_query(opts)
513 |> restrict_unlisted(opts)
514 |> fetch_paginated_optimized(opts, pagination)
517 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
518 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
520 |> Map.put(:restrict_unlisted, true)
521 |> fetch_public_or_unlisted_activities(pagination)
524 @valid_visibilities ~w[direct unlisted public private]
526 defp restrict_visibility(query, %{visibility: visibility})
527 when is_list(visibility) do
528 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
533 "activity_visibility(?, ?, ?) = ANY (?)",
541 Logger.error("Could not restrict visibility to #{visibility}")
545 defp restrict_visibility(query, %{visibility: visibility})
546 when visibility in @valid_visibilities do
550 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
554 defp restrict_visibility(_query, %{visibility: visibility})
555 when visibility not in @valid_visibilities do
556 Logger.error("Could not restrict visibility to #{visibility}")
559 defp restrict_visibility(query, _visibility), do: query
561 defp exclude_visibility(query, %{exclude_visibilities: visibility})
562 when is_list(visibility) do
563 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
568 "activity_visibility(?, ?, ?) = ANY (?)",
576 Logger.error("Could not exclude visibility to #{visibility}")
581 defp exclude_visibility(query, %{exclude_visibilities: visibility})
582 when visibility in @valid_visibilities do
587 "activity_visibility(?, ?, ?) = ?",
596 defp exclude_visibility(query, %{exclude_visibilities: visibility})
597 when visibility not in [nil | @valid_visibilities] do
598 Logger.error("Could not exclude visibility to #{visibility}")
602 defp exclude_visibility(query, _visibility), do: query
604 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
607 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
610 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
613 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
617 defp restrict_thread_visibility(query, _, _), do: query
619 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
622 |> Map.put(:user, reading_user)
623 |> Map.put(:actor_id, user.ap_id)
626 godmode: params[:godmode],
627 reading_user: reading_user
629 |> user_activities_recipients()
630 |> fetch_activities(params)
634 def fetch_user_activities(user, reading_user, params \\ %{})
636 def fetch_user_activities(user, reading_user, %{total: true} = params) do
637 result = fetch_activities_for_user(user, reading_user, params)
639 Keyword.put(result, :items, Enum.reverse(result[:items]))
642 def fetch_user_activities(user, reading_user, params) do
644 |> fetch_activities_for_user(reading_user, params)
648 defp fetch_activities_for_user(user, reading_user, params) do
651 |> Map.put(:type, ["Create", "Announce"])
652 |> Map.put(:user, reading_user)
653 |> Map.put(:actor_id, user.ap_id)
654 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
657 if User.blocks?(reading_user, user) do
661 |> Map.put(:blocking_user, reading_user)
662 |> Map.put(:muting_user, reading_user)
665 pagination_type = Map.get(params, :pagination_type) || :keyset
668 godmode: params[:godmode],
669 reading_user: reading_user
671 |> user_activities_recipients()
672 |> fetch_activities(params, pagination_type)
675 def fetch_statuses(reading_user, %{total: true} = params) do
676 result = fetch_activities_for_reading_user(reading_user, params)
677 Keyword.put(result, :items, Enum.reverse(result[:items]))
680 def fetch_statuses(reading_user, params) do
682 |> fetch_activities_for_reading_user(params)
686 defp fetch_activities_for_reading_user(reading_user, params) do
687 params = Map.put(params, :type, ["Create", "Announce"])
690 godmode: params[:godmode],
691 reading_user: reading_user
693 |> user_activities_recipients()
694 |> fetch_activities(params, :offset)
697 defp user_activities_recipients(%{godmode: true}), do: []
699 defp user_activities_recipients(%{reading_user: reading_user}) do
701 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
703 [Constants.as_public()]
707 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
708 raise "Can't use the child object without preloading!"
711 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
713 [activity, object] in query,
716 "?->>'type' != ? or ?->>'actor' != ?",
725 defp restrict_announce_object_actor(query, _), do: query
727 defp restrict_since(query, %{since_id: ""}), do: query
729 defp restrict_since(query, %{since_id: since_id}) do
730 from(activity in query, where: activity.id > ^since_id)
733 defp restrict_since(query, _), do: query
735 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
736 raise_on_missing_preload()
739 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
741 [_activity, object] in query,
742 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
746 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
747 restrict_embedded_tag_any(query, %{tag: tag})
750 defp restrict_embedded_tag_all(query, _), do: query
752 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
753 raise_on_missing_preload()
756 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
758 [_activity, object] in query,
759 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
763 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
764 restrict_embedded_tag_any(query, %{tag: [tag]})
767 defp restrict_embedded_tag_any(query, _), do: query
769 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
770 raise_on_missing_preload()
773 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
775 [_activity, object] in query,
776 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
780 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
781 when is_binary(tag_reject) do
782 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
785 defp restrict_embedded_tag_reject_any(query, _), do: query
787 defp object_ids_query_for_tags(tags) do
788 from(hto in "hashtags_objects")
789 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
790 |> where([hto, ht], ht.name in ^tags)
791 |> select([hto], hto.object_id)
792 |> distinct([hto], true)
795 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
796 raise_on_missing_preload()
799 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
800 restrict_hashtag_any(query, %{tag: single_tag})
803 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
805 [_activity, object] in query,
809 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
810 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
811 AND hashtags_objects.object_id = ?) @> ?
820 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
821 restrict_hashtag_all(query, %{tag_all: [tag]})
824 defp restrict_hashtag_all(query, _), do: query
826 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
827 raise_on_missing_preload()
830 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
832 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
835 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
837 [_activity, object] in query,
838 join: hto in "hashtags_objects",
839 on: hto.object_id == object.id,
840 where: hto.hashtag_id in ^hashtag_ids,
841 distinct: [desc: object.id],
842 order_by: [desc: object.id]
846 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
847 restrict_hashtag_any(query, %{tag: [tag]})
850 defp restrict_hashtag_any(query, _), do: query
852 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
853 raise_on_missing_preload()
856 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
858 [_activity, object] in query,
859 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
863 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
864 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
867 defp restrict_hashtag_reject_any(query, _), do: query
869 defp raise_on_missing_preload do
870 raise "Can't use the child object without preloading!"
873 defp restrict_recipients(query, [], _user), do: query
875 defp restrict_recipients(query, recipients, nil) do
876 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
879 defp restrict_recipients(query, recipients, user) do
882 where: fragment("? && ?", ^recipients, activity.recipients),
883 or_where: activity.actor == ^user.ap_id
887 defp restrict_local(query, %{local_only: true}) do
888 from(activity in query, where: activity.local == true)
891 defp restrict_local(query, _), do: query
893 defp restrict_remote(query, %{remote: true}) do
894 from(activity in query, where: activity.local == false)
897 defp restrict_remote(query, _), do: query
899 defp restrict_actor(query, %{actor_id: actor_id}) do
900 from(activity in query, where: activity.actor == ^actor_id)
903 defp restrict_actor(query, _), do: query
905 defp restrict_type(query, %{type: type}) when is_binary(type) do
906 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
909 defp restrict_type(query, %{type: type}) do
910 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
913 defp restrict_type(query, _), do: query
915 defp restrict_state(query, %{state: state}) do
916 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
919 defp restrict_state(query, _), do: query
921 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
923 [_activity, object] in query,
924 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
928 defp restrict_favorited_by(query, _), do: query
930 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
931 raise "Can't use the child object without preloading!"
934 defp restrict_media(query, %{only_media: true}) do
936 [activity, object] in query,
937 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
938 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
942 defp restrict_media(query, _), do: query
944 defp restrict_replies(query, %{exclude_replies: true}) do
946 [_activity, object] in query,
947 where: fragment("?->>'inReplyTo' is null", object.data)
951 defp restrict_replies(query, %{
952 reply_filtering_user: %User{} = user,
953 reply_visibility: "self"
956 [activity, object] in query,
959 "?->>'inReplyTo' is null OR ? = ANY(?)",
967 defp restrict_replies(query, %{
968 reply_filtering_user: %User{} = user,
969 reply_visibility: "following"
972 [activity, object] in query,
976 ?->>'type' != 'Create' -- This isn't a Create
977 OR ?->>'inReplyTo' is null -- this isn't a reply
978 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
979 -- unless they are the author (because authors
980 -- are also part of the recipients). This leads
981 -- to a bug that self-replies by friends won't
983 OR ? = ? -- The actor is us
987 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
996 defp restrict_replies(query, _), do: query
998 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
999 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1002 defp restrict_reblogs(query, _), do: query
1004 defp restrict_muted(query, %{with_muted: true}), do: query
1006 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1007 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1010 from([activity] in query,
1011 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1014 "not (?->'to' \\?| ?) or ? = ?",
1022 unless opts[:skip_preload] do
1023 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1029 defp restrict_muted(query, _), do: query
1031 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1032 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1033 domain_blocks = user.domain_blocks || []
1035 following_ap_ids = User.get_friends_ap_ids(user)
1038 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1041 [activity, object: o] in query,
1042 # You don't block the author
1043 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1045 # You don't block any recipients, and didn't author the post
1048 "((not (? && ?)) or ? = ?)",
1049 activity.recipients,
1055 # You don't block the domain of any recipients, and didn't author the post
1058 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1059 activity.recipients,
1065 # It's not a boost of a user you block
1068 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1074 # You don't block the author's domain, and also don't follow the author
1077 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1084 # Same as above, but checks the Object
1087 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1096 defp restrict_blocked(query, _), do: query
1098 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1099 if Config.get([:activitypub, :blockers_visible]) == true do
1102 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1106 # The author doesn't block you
1107 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1109 # It's not a boost of a user that blocks you
1112 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1121 defp restrict_blockers_visibility(query, _), do: query
1123 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1128 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1130 ^[Constants.as_public()]
1135 defp restrict_unlisted(query, _), do: query
1137 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1139 [activity, object: o] in query,
1142 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1151 defp restrict_pinned(query, _), do: query
1153 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1154 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1160 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1168 defp restrict_muted_reblogs(query, _), do: query
1170 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1173 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1177 defp restrict_instance(query, _), do: query
1179 defp restrict_filtered(query, %{user: %User{} = user}) do
1180 case Filter.compose_regex(user) do
1185 from([activity, object] in query,
1187 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1188 activity.actor == ^user.ap_id
1193 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1194 restrict_filtered(query, %{user: user})
1197 defp restrict_filtered(query, _), do: query
1199 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1201 defp exclude_poll_votes(query, _) do
1202 if has_named_binding?(query, :object) do
1203 from([activity, object: o] in query,
1204 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1211 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1213 defp exclude_chat_messages(query, _) do
1214 if has_named_binding?(query, :object) do
1215 from([activity, object: o] in query,
1216 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1223 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1225 defp exclude_invisible_actors(query, _opts) do
1227 User.Query.build(%{invisible: true, select: [:ap_id]})
1229 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1231 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1234 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1235 from(activity in query, where: activity.id != ^id)
1238 defp exclude_id(query, _), do: query
1240 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1242 defp maybe_preload_objects(query, _) do
1244 |> Activity.with_preloaded_object()
1247 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1249 defp maybe_preload_bookmarks(query, opts) do
1251 |> Activity.with_preloaded_bookmark(opts[:user])
1254 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1256 |> Activity.with_preloaded_report_notes()
1259 defp maybe_preload_report_notes(query, _), do: query
1261 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1263 defp maybe_set_thread_muted_field(query, opts) do
1265 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1268 defp maybe_order(query, %{order: :desc}) do
1270 |> order_by(desc: :id)
1273 defp maybe_order(query, %{order: :asc}) do
1275 |> order_by(asc: :id)
1278 defp maybe_order(query, _), do: query
1280 defp normalize_fetch_activities_query_opts(opts) do
1281 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1283 value when is_bitstring(value) ->
1284 Map.put(opts, key, Hashtag.normalize_name(value))
1286 value when is_list(value) ->
1289 |> Enum.map(&Hashtag.normalize_name/1)
1292 Map.put(opts, key, normalized_value)
1300 defp fetch_activities_query_ap_ids_ops(opts) do
1301 source_user = opts[:muting_user]
1302 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1304 ap_id_relationships =
1305 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1306 [:block | ap_id_relationships]
1311 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1313 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1314 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1316 restrict_muted_reblogs_opts =
1317 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1319 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1322 def fetch_activities_query(recipients, opts \\ %{}) do
1323 opts = normalize_fetch_activities_query_opts(opts)
1325 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1326 fetch_activities_query_ap_ids_ops(opts)
1329 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1334 |> maybe_preload_objects(opts)
1335 |> maybe_preload_bookmarks(opts)
1336 |> maybe_preload_report_notes(opts)
1337 |> maybe_set_thread_muted_field(opts)
1338 |> maybe_order(opts)
1339 |> restrict_recipients(recipients, opts[:user])
1340 |> restrict_replies(opts)
1341 |> restrict_since(opts)
1342 |> restrict_local(opts)
1343 |> restrict_remote(opts)
1344 |> restrict_actor(opts)
1345 |> restrict_type(opts)
1346 |> restrict_state(opts)
1347 |> restrict_favorited_by(opts)
1348 |> restrict_blocked(restrict_blocked_opts)
1349 |> restrict_blockers_visibility(opts)
1350 |> restrict_muted(restrict_muted_opts)
1351 |> restrict_filtered(opts)
1352 |> restrict_media(opts)
1353 |> restrict_visibility(opts)
1354 |> restrict_thread_visibility(opts, config)
1355 |> restrict_reblogs(opts)
1356 |> restrict_pinned(opts)
1357 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1358 |> restrict_instance(opts)
1359 |> restrict_announce_object_actor(opts)
1360 |> restrict_filtered(opts)
1361 |> Activity.restrict_deactivated_users()
1362 |> exclude_poll_votes(opts)
1363 |> exclude_chat_messages(opts)
1364 |> exclude_invisible_actors(opts)
1365 |> exclude_visibility(opts)
1367 if Config.feature_enabled?(:improved_hashtag_timeline) do
1369 |> restrict_hashtag_any(opts)
1370 |> restrict_hashtag_all(opts)
1371 |> restrict_hashtag_reject_any(opts)
1374 |> restrict_embedded_tag_any(opts)
1375 |> restrict_embedded_tag_all(opts)
1376 |> restrict_embedded_tag_reject_any(opts)
1381 Fetch favorites activities of user with order by sort adds to favorites
1383 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1384 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1386 |> Activity.Queries.by_actor()
1387 |> Activity.Queries.by_type("Like")
1388 |> Activity.with_joined_object()
1389 |> Object.with_joined_activity()
1390 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1391 |> order_by([like, _, _], desc_nulls_last: like.id)
1392 |> Pagination.fetch_paginated(
1393 Map.merge(params, %{skip_order: true}),
1398 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1399 Enum.map(activities, fn
1400 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1401 if Enum.any?(bcc, &(&1 in list_memberships)) do
1402 update_in(activity.data["cc"], &[user_ap_id | &1])
1412 defp maybe_update_cc(activities, _, _), do: activities
1414 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1415 from(activity in query,
1417 fragment("? && ?", activity.recipients, ^recipients) or
1418 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1419 ^Constants.as_public() in activity.recipients)
1423 def fetch_activities_bounded(
1425 recipients_with_public,
1427 pagination \\ :keyset
1429 fetch_activities_query([], opts)
1430 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1431 |> Pagination.fetch_paginated(opts, pagination)
1435 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1436 def upload(file, opts \\ []) do
1437 with {:ok, data} <- Upload.store(file, opts) do
1438 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1440 Repo.insert(%Object{data: obj_data})
1444 @spec get_actor_url(any()) :: binary() | nil
1445 defp get_actor_url(url) when is_binary(url), do: url
1446 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1448 defp get_actor_url(url) when is_list(url) do
1454 defp get_actor_url(_url), do: nil
1456 defp normalize_image(%{"url" => url}) do
1459 "url" => [%{"href" => url}]
1463 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1464 defp normalize_image(_), do: nil
1466 defp object_to_user_data(data) do
1469 |> Map.get("attachment", [])
1470 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1471 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1475 |> Map.get("tag", [])
1477 %{"type" => "Emoji"} -> true
1480 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1481 {String.trim(name, ":"), url}
1484 is_locked = data["manuallyApprovesFollowers"] || false
1485 capabilities = data["capabilities"] || %{}
1486 accepts_chat_messages = capabilities["acceptsChatMessages"]
1487 data = Transmogrifier.maybe_fix_user_object(data)
1488 is_discoverable = data["discoverable"] || false
1489 invisible = data["invisible"] || false
1490 actor_type = data["type"] || "Person"
1492 featured_address = data["featured"]
1493 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1496 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1497 data["publicKey"]["publicKeyPem"]
1503 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1504 data["endpoints"]["sharedInbox"]
1511 uri: get_actor_url(data["url"]),
1513 banner: normalize_image(data["image"]),
1516 is_locked: is_locked,
1517 is_discoverable: is_discoverable,
1518 invisible: invisible,
1519 avatar: normalize_image(data["icon"]),
1521 follower_address: data["followers"],
1522 following_address: data["following"],
1523 featured_address: featured_address,
1524 bio: data["summary"] || "",
1525 actor_type: actor_type,
1526 also_known_as: Map.get(data, "alsoKnownAs", []),
1527 public_key: public_key,
1528 inbox: data["inbox"],
1529 shared_inbox: shared_inbox,
1530 accepts_chat_messages: accepts_chat_messages,
1531 pinned_objects: pinned_objects
1534 # nickname can be nil because of virtual actors
1535 if data["preferredUsername"] do
1539 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1542 Map.put(user_data, :nickname, nil)
1546 def fetch_follow_information_for_user(user) do
1547 with {:ok, following_data} <-
1548 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1549 {:ok, hide_follows} <- collection_private(following_data),
1550 {:ok, followers_data} <-
1551 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1552 {:ok, hide_followers} <- collection_private(followers_data) do
1555 hide_follows: hide_follows,
1556 follower_count: normalize_counter(followers_data["totalItems"]),
1557 following_count: normalize_counter(following_data["totalItems"]),
1558 hide_followers: hide_followers
1561 {:error, _} = e -> e
1566 defp normalize_counter(counter) when is_integer(counter), do: counter
1567 defp normalize_counter(_), do: 0
1569 def maybe_update_follow_information(user_data) do
1570 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1571 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1573 {:collections_available,
1574 !!(user_data[:following_address] && user_data[:follower_address])},
1576 fetch_follow_information_for_user(user_data) do
1577 info = Map.merge(user_data[:info] || %{}, info)
1580 |> Map.put(:info, info)
1582 {:user_type_check, false} ->
1585 {:collections_available, false} ->
1588 {:enabled, false} ->
1593 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1600 defp collection_private(%{"first" => %{"type" => type}})
1601 when type in ["CollectionPage", "OrderedCollectionPage"],
1604 defp collection_private(%{"first" => first}) do
1605 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1606 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1609 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1610 {:error, _} = e -> e
1615 defp collection_private(_data), do: {:ok, true}
1617 def user_data_from_user_object(data) do
1618 with {:ok, data} <- MRF.filter(data) do
1619 {:ok, object_to_user_data(data)}
1625 def fetch_and_prepare_user_from_ap_id(ap_id) do
1626 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1627 {:ok, data} <- user_data_from_user_object(data) do
1628 {:ok, maybe_update_follow_information(data)}
1630 # If this has been deleted, only log a debug and not an error
1631 {:error, "Object has been deleted" = e} ->
1632 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1635 {:error, {:reject, reason} = e} ->
1636 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1640 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1645 def maybe_handle_clashing_nickname(data) do
1646 with nickname when is_binary(nickname) <- data[:nickname],
1647 %User{} = old_user <- User.get_by_nickname(nickname),
1648 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1650 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1654 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1655 |> User.update_and_set_cache()
1657 {:ap_id_comparison, true} ->
1659 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1667 def pin_data_from_featured_collection(%{
1668 "type" => "OrderedCollection",
1671 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1673 |> Map.get("orderedItems")
1674 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1677 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1682 def pin_data_from_featured_collection(
1687 when type in ["OrderedCollection", "Collection"] do
1688 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1689 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1692 def fetch_and_prepare_featured_from_ap_id(nil) do
1696 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1697 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1698 {:ok, pin_data_from_featured_collection(data)}
1701 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1706 def pinned_fetch_task(nil), do: nil
1708 def pinned_fetch_task(%{pinned_objects: pins}) do
1709 if Enum.all?(pins, fn {ap_id, _} ->
1710 Object.get_cached_by_ap_id(ap_id) ||
1711 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1719 def make_user_from_ap_id(ap_id) do
1720 user = User.get_cached_by_ap_id(ap_id)
1722 if user && !User.ap_enabled?(user) do
1723 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1725 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1726 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1730 |> User.remote_user_changeset(data)
1731 |> User.update_and_set_cache()
1733 maybe_handle_clashing_nickname(data)
1736 |> User.remote_user_changeset()
1744 def make_user_from_nickname(nickname) do
1745 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1746 make_user_from_ap_id(ap_id)
1748 _e -> {:error, "No AP id in WebFinger"}
1752 # filter out broken threads
1753 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1754 entire_thread_visible_for_user?(activity, user)
1757 # do post-processing on a specific activity
1758 def contain_activity(%Activity{} = activity, %User{} = user) do
1759 contain_broken_threads(activity, user)
1762 def fetch_direct_messages_query do
1764 |> restrict_type(%{type: "Create"})
1765 |> restrict_visibility(%{visibility: "direct"})
1766 |> order_by([activity], asc: activity.id)