1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
8 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
16 alias Pleroma.Notification
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
36 require Pleroma.Constants
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
68 defp check_actor_can_insert(_), do: true
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
75 defp check_remote_limit(_), do: true
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
98 defp increase_replies_count_if_reply(_create_data), do: :noop
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
113 Repo.insert(%Activity{
116 recipients: recipients,
117 actor: object["actor"]
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
149 %Activity{} = activity ->
155 {:containment, _} = error ->
158 {:error, _} = error ->
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
166 recipients: recipients,
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
181 defp insert_activity_with_expiration(data, local, recipients) do
185 actor: data["actor"],
186 recipients: recipients
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
200 stream_out_participations(participations)
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
225 defp get_participations({:ok, conversation}) do
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
231 defp get_participations(_), do: []
233 def stream_out_participations(participations) do
236 |> Repo.preload(:user)
238 Streamer.stream("participation", participations)
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
259 def stream_out_participations(_, _), do: :noop
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
270 def stream_out(_activity) do
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
290 %{to: to, actor: actor, published: published, context: context, object: object},
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
305 {:quick_insert, true, activity} ->
308 {:fake, true, activity} ->
312 Repo.rollback(message)
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
390 {:error, error} -> Repo.rollback(error)
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id,
401 "to" => [origin.follower_address]
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_blockers_visibility(opts)
435 |> restrict_recipients(recipients, opts[:user])
436 |> restrict_filtered(opts)
440 "?->>'type' = ? and ?->>'context' = ?",
447 |> exclude_poll_votes(opts)
449 |> order_by([activity], desc: activity.id)
452 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
453 def fetch_activities_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(opts)
459 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
460 FlakeId.Ecto.CompatType.t() | nil
461 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
463 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
464 |> restrict_visibility(%{visibility: "direct"})
470 defp fetch_paginated_optimized(query, opts, pagination) do
471 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
472 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
473 opts = Map.put(opts, :skip_extra_order, true)
475 Pagination.fetch_paginated(query, opts, pagination)
478 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
479 list_memberships = Pleroma.List.memberships(opts[:user])
481 fetch_activities_query(recipients ++ list_memberships, opts)
482 |> fetch_paginated_optimized(opts, pagination)
484 |> maybe_update_cc(list_memberships, opts[:user])
487 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
488 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
489 includes_local_public = Map.get(opts, :includes_local_public, false)
491 opts = Map.delete(opts, :user)
493 intended_recipients =
494 if includes_local_public do
495 [Constants.as_public(), as_local_public()]
497 [Constants.as_public()]
501 |> fetch_activities_query(opts)
502 |> restrict_unlisted(opts)
503 |> fetch_paginated_optimized(opts, pagination)
506 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
507 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
509 |> Map.put(:restrict_unlisted, true)
510 |> fetch_public_or_unlisted_activities(pagination)
513 @valid_visibilities ~w[direct unlisted public private]
515 defp restrict_visibility(query, %{visibility: visibility})
516 when is_list(visibility) do
517 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
522 "activity_visibility(?, ?, ?) = ANY (?)",
530 Logger.error("Could not restrict visibility to #{visibility}")
534 defp restrict_visibility(query, %{visibility: visibility})
535 when visibility in @valid_visibilities do
539 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
543 defp restrict_visibility(_query, %{visibility: visibility})
544 when visibility not in @valid_visibilities do
545 Logger.error("Could not restrict visibility to #{visibility}")
548 defp restrict_visibility(query, _visibility), do: query
550 defp exclude_visibility(query, %{exclude_visibilities: visibility})
551 when is_list(visibility) do
552 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
557 "activity_visibility(?, ?, ?) = ANY (?)",
565 Logger.error("Could not exclude visibility to #{visibility}")
570 defp exclude_visibility(query, %{exclude_visibilities: visibility})
571 when visibility in @valid_visibilities do
576 "activity_visibility(?, ?, ?) = ?",
585 defp exclude_visibility(query, %{exclude_visibilities: visibility})
586 when visibility not in [nil | @valid_visibilities] do
587 Logger.error("Could not exclude visibility to #{visibility}")
591 defp exclude_visibility(query, _visibility), do: query
593 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
596 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
599 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
600 local_public = as_local_public()
604 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
608 defp restrict_thread_visibility(query, _, _), do: query
610 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
617 godmode: params[:godmode],
618 reading_user: reading_user
620 |> user_activities_recipients()
621 |> fetch_activities(params)
625 def fetch_user_activities(user, reading_user, params \\ %{})
627 def fetch_user_activities(user, reading_user, %{total: true} = params) do
628 result = fetch_activities_for_user(user, reading_user, params)
630 Keyword.put(result, :items, Enum.reverse(result[:items]))
633 def fetch_user_activities(user, reading_user, params) do
635 |> fetch_activities_for_user(reading_user, params)
639 defp fetch_activities_for_user(user, reading_user, params) do
642 |> Map.put(:type, ["Create", "Announce"])
643 |> Map.put(:user, reading_user)
644 |> Map.put(:actor_id, user.ap_id)
645 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
648 if User.blocks?(reading_user, user) do
652 |> Map.put(:blocking_user, reading_user)
653 |> Map.put(:muting_user, reading_user)
656 pagination_type = Map.get(params, :pagination_type) || :keyset
659 godmode: params[:godmode],
660 reading_user: reading_user
662 |> user_activities_recipients()
663 |> fetch_activities(params, pagination_type)
666 def fetch_statuses(reading_user, %{total: true} = params) do
667 result = fetch_activities_for_reading_user(reading_user, params)
668 Keyword.put(result, :items, Enum.reverse(result[:items]))
671 def fetch_statuses(reading_user, params) do
673 |> fetch_activities_for_reading_user(params)
677 defp fetch_activities_for_reading_user(reading_user, params) do
678 params = Map.put(params, :type, ["Create", "Announce"])
681 godmode: params[:godmode],
682 reading_user: reading_user
684 |> user_activities_recipients()
685 |> fetch_activities(params, :offset)
688 defp user_activities_recipients(%{godmode: true}), do: []
690 defp user_activities_recipients(%{reading_user: reading_user}) do
691 if not is_nil(reading_user) and reading_user.local do
693 Constants.as_public(),
695 reading_user.ap_id | User.following(reading_user)
698 [Constants.as_public()]
702 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
703 raise "Can't use the child object without preloading!"
706 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
708 [activity, object] in query,
711 "?->>'type' != ? or ?->>'actor' != ?",
720 defp restrict_announce_object_actor(query, _), do: query
722 defp restrict_since(query, %{since_id: ""}), do: query
724 defp restrict_since(query, %{since_id: since_id}) do
725 from(activity in query, where: activity.id > ^since_id)
728 defp restrict_since(query, _), do: query
730 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
731 raise_on_missing_preload()
734 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
736 [_activity, object] in query,
737 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
741 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
742 restrict_embedded_tag_any(query, %{tag: tag})
745 defp restrict_embedded_tag_all(query, _), do: query
747 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
748 raise_on_missing_preload()
751 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
758 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
759 restrict_embedded_tag_any(query, %{tag: [tag]})
762 defp restrict_embedded_tag_any(query, _), do: query
764 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
765 raise_on_missing_preload()
768 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
770 [_activity, object] in query,
771 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
775 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
776 when is_binary(tag_reject) do
777 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
780 defp restrict_embedded_tag_reject_any(query, _), do: query
782 defp object_ids_query_for_tags(tags) do
783 from(hto in "hashtags_objects")
784 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
785 |> where([hto, ht], ht.name in ^tags)
786 |> select([hto], hto.object_id)
787 |> distinct([hto], true)
790 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
791 raise_on_missing_preload()
794 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
795 restrict_hashtag_any(query, %{tag: single_tag})
798 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
800 [_activity, object] in query,
804 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
805 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
806 AND hashtags_objects.object_id = ?) @> ?
815 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
816 restrict_hashtag_all(query, %{tag_all: [tag]})
819 defp restrict_hashtag_all(query, _), do: query
821 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
822 raise_on_missing_preload()
825 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
827 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
830 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
832 [_activity, object] in query,
833 join: hto in "hashtags_objects",
834 on: hto.object_id == object.id,
835 where: hto.hashtag_id in ^hashtag_ids,
836 distinct: [desc: object.id],
837 order_by: [desc: object.id]
841 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
842 restrict_hashtag_any(query, %{tag: [tag]})
845 defp restrict_hashtag_any(query, _), do: query
847 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
848 raise_on_missing_preload()
851 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
853 [_activity, object] in query,
854 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
858 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
859 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
862 defp restrict_hashtag_reject_any(query, _), do: query
864 defp raise_on_missing_preload do
865 raise "Can't use the child object without preloading!"
868 defp restrict_recipients(query, [], _user), do: query
870 defp restrict_recipients(query, recipients, nil) do
871 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
874 defp restrict_recipients(query, recipients, user) do
877 where: fragment("? && ?", ^recipients, activity.recipients),
878 or_where: activity.actor == ^user.ap_id
882 defp restrict_local(query, %{local_only: true}) do
883 from(activity in query, where: activity.local == true)
886 defp restrict_local(query, _), do: query
888 defp restrict_remote(query, %{remote: true}) do
889 from(activity in query, where: activity.local == false)
892 defp restrict_remote(query, _), do: query
894 defp restrict_actor(query, %{actor_id: actor_id}) do
895 from(activity in query, where: activity.actor == ^actor_id)
898 defp restrict_actor(query, _), do: query
900 defp restrict_type(query, %{type: type}) when is_binary(type) do
901 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
904 defp restrict_type(query, %{type: type}) do
905 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
908 defp restrict_type(query, _), do: query
910 defp restrict_state(query, %{state: state}) do
911 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
914 defp restrict_state(query, _), do: query
916 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
918 [_activity, object] in query,
919 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
923 defp restrict_favorited_by(query, _), do: query
925 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
926 raise "Can't use the child object without preloading!"
929 defp restrict_media(query, %{only_media: true}) do
931 [activity, object] in query,
932 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
933 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
937 defp restrict_media(query, _), do: query
939 defp restrict_replies(query, %{exclude_replies: true}) do
941 [_activity, object] in query,
942 where: fragment("?->>'inReplyTo' is null", object.data)
946 defp restrict_replies(query, %{
947 reply_filtering_user: %User{} = user,
948 reply_visibility: "self"
951 [activity, object] in query,
954 "?->>'inReplyTo' is null OR ? = ANY(?)",
962 defp restrict_replies(query, %{
963 reply_filtering_user: %User{} = user,
964 reply_visibility: "following"
967 [activity, object] in query,
971 ?->>'type' != 'Create' -- This isn't a Create
972 OR ?->>'inReplyTo' is null -- this isn't a reply
973 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
974 -- unless they are the author (because authors
975 -- are also part of the recipients). This leads
976 -- to a bug that self-replies by friends won't
978 OR ? = ? -- The actor is us
982 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
991 defp restrict_replies(query, _), do: query
993 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
994 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
997 defp restrict_reblogs(query, _), do: query
999 defp restrict_muted(query, %{with_muted: true}), do: query
1001 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1002 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1005 from([activity] in query,
1006 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1009 "not (?->'to' \\?| ?) or ? = ?",
1017 unless opts[:skip_preload] do
1018 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1024 defp restrict_muted(query, _), do: query
1026 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1027 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1028 domain_blocks = user.domain_blocks || []
1030 following_ap_ids = User.get_friends_ap_ids(user)
1033 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1036 [activity, object: o] in query,
1037 # You don't block the author
1038 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1040 # You don't block any recipients, and didn't author the post
1043 "((not (? && ?)) or ? = ?)",
1044 activity.recipients,
1050 # You don't block the domain of any recipients, and didn't author the post
1053 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1054 activity.recipients,
1060 # It's not a boost of a user you block
1063 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1069 # You don't block the author's domain, and also don't follow the author
1072 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1079 # Same as above, but checks the Object
1082 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1091 defp restrict_blocked(query, _), do: query
1093 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1094 if Config.get([:activitypub, :blockers_visible]) == true do
1097 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1101 # The author doesn't block you
1102 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1104 # It's not a boost of a user that blocks you
1107 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1116 defp restrict_blockers_visibility(query, _), do: query
1118 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1123 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1125 ^[Constants.as_public()]
1130 defp restrict_unlisted(query, _), do: query
1132 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1134 [activity, object: o] in query,
1137 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1146 defp restrict_pinned(query, _), do: query
1148 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1149 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1155 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1163 defp restrict_muted_reblogs(query, _), do: query
1165 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1168 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1172 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1175 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1179 defp restrict_instance(query, _), do: query
1181 defp restrict_filtered(query, %{user: %User{} = user}) do
1182 case Filter.compose_regex(user) do
1187 from([activity, object] in query,
1189 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1190 activity.actor == ^user.ap_id
1195 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1196 restrict_filtered(query, %{user: user})
1199 defp restrict_filtered(query, _), do: query
1201 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1203 defp exclude_poll_votes(query, _) do
1204 if has_named_binding?(query, :object) do
1205 from([activity, object: o] in query,
1206 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1213 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1215 defp exclude_invisible_actors(query, _opts) do
1217 User.Query.build(%{invisible: true, select: [:ap_id]})
1219 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1221 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1224 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1225 from(activity in query, where: activity.id != ^id)
1228 defp exclude_id(query, _), do: query
1230 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1232 defp maybe_preload_objects(query, _) do
1234 |> Activity.with_preloaded_object()
1237 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1239 defp maybe_preload_bookmarks(query, opts) do
1241 |> Activity.with_preloaded_bookmark(opts[:user])
1244 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1246 |> Activity.with_preloaded_report_notes()
1249 defp maybe_preload_report_notes(query, _), do: query
1251 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1253 defp maybe_set_thread_muted_field(query, opts) do
1255 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1258 defp maybe_order(query, %{order: :desc}) do
1260 |> order_by(desc: :id)
1263 defp maybe_order(query, %{order: :asc}) do
1265 |> order_by(asc: :id)
1268 defp maybe_order(query, _), do: query
1270 defp normalize_fetch_activities_query_opts(opts) do
1271 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1273 value when is_bitstring(value) ->
1274 Map.put(opts, key, Hashtag.normalize_name(value))
1276 value when is_list(value) ->
1279 |> Enum.map(&Hashtag.normalize_name/1)
1282 Map.put(opts, key, normalized_value)
1290 defp fetch_activities_query_ap_ids_ops(opts) do
1291 source_user = opts[:muting_user]
1292 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1294 ap_id_relationships =
1295 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1296 [:block | ap_id_relationships]
1301 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1303 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1304 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1306 restrict_muted_reblogs_opts =
1307 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1309 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1312 def fetch_activities_query(recipients, opts \\ %{}) do
1313 opts = normalize_fetch_activities_query_opts(opts)
1315 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1316 fetch_activities_query_ap_ids_ops(opts)
1319 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1324 |> maybe_preload_objects(opts)
1325 |> maybe_preload_bookmarks(opts)
1326 |> maybe_preload_report_notes(opts)
1327 |> maybe_set_thread_muted_field(opts)
1328 |> maybe_order(opts)
1329 |> restrict_recipients(recipients, opts[:user])
1330 |> restrict_replies(opts)
1331 |> restrict_since(opts)
1332 |> restrict_local(opts)
1333 |> restrict_remote(opts)
1334 |> restrict_actor(opts)
1335 |> restrict_type(opts)
1336 |> restrict_state(opts)
1337 |> restrict_favorited_by(opts)
1338 |> restrict_blocked(restrict_blocked_opts)
1339 |> restrict_blockers_visibility(opts)
1340 |> restrict_muted(restrict_muted_opts)
1341 |> restrict_filtered(opts)
1342 |> restrict_media(opts)
1343 |> restrict_visibility(opts)
1344 |> restrict_thread_visibility(opts, config)
1345 |> restrict_reblogs(opts)
1346 |> restrict_pinned(opts)
1347 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1348 |> restrict_instance(opts)
1349 |> restrict_announce_object_actor(opts)
1350 |> restrict_filtered(opts)
1351 |> Activity.restrict_deactivated_users()
1352 |> exclude_poll_votes(opts)
1353 |> exclude_invisible_actors(opts)
1354 |> exclude_visibility(opts)
1356 if Config.feature_enabled?(:improved_hashtag_timeline) do
1358 |> restrict_hashtag_any(opts)
1359 |> restrict_hashtag_all(opts)
1360 |> restrict_hashtag_reject_any(opts)
1363 |> restrict_embedded_tag_any(opts)
1364 |> restrict_embedded_tag_all(opts)
1365 |> restrict_embedded_tag_reject_any(opts)
1370 Fetch favorites activities of user with order by sort adds to favorites
1372 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1373 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1375 |> Activity.Queries.by_actor()
1376 |> Activity.Queries.by_type("Like")
1377 |> Activity.with_joined_object()
1378 |> Object.with_joined_activity()
1379 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1380 |> order_by([like, _, _], desc_nulls_last: like.id)
1381 |> Pagination.fetch_paginated(
1382 Map.merge(params, %{skip_order: true}),
1387 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1388 Enum.map(activities, fn
1389 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1390 if Enum.any?(bcc, &(&1 in list_memberships)) do
1391 update_in(activity.data["cc"], &[user_ap_id | &1])
1401 defp maybe_update_cc(activities, _, _), do: activities
1403 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1404 from(activity in query,
1406 fragment("? && ?", activity.recipients, ^recipients) or
1407 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1408 ^Constants.as_public() in activity.recipients)
1412 def fetch_activities_bounded(
1414 recipients_with_public,
1416 pagination \\ :keyset
1418 fetch_activities_query([], opts)
1419 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1420 |> Pagination.fetch_paginated(opts, pagination)
1424 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1425 def upload(file, opts \\ []) do
1426 with {:ok, data} <- Upload.store(file, opts) do
1427 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1429 Repo.insert(%Object{data: obj_data})
1433 @spec get_actor_url(any()) :: binary() | nil
1434 defp get_actor_url(url) when is_binary(url), do: url
1435 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1437 defp get_actor_url(url) when is_list(url) do
1443 defp get_actor_url(_url), do: nil
1445 defp normalize_image(%{"url" => url}) do
1448 "url" => [%{"href" => url}]
1452 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1453 defp normalize_image(_), do: nil
1455 defp object_to_user_data(data, additional) do
1458 |> Map.get("attachment", [])
1459 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1460 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1464 |> Map.get("tag", [])
1466 %{"type" => "Emoji"} -> true
1469 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1470 {String.trim(name, ":"), url}
1473 is_locked = data["manuallyApprovesFollowers"] || false
1474 data = Transmogrifier.maybe_fix_user_object(data)
1475 is_discoverable = data["discoverable"] || false
1476 invisible = data["invisible"] || false
1477 actor_type = data["type"] || "Person"
1479 featured_address = data["featured"]
1480 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1483 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1484 data["publicKey"]["publicKeyPem"]
1488 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1489 data["endpoints"]["sharedInbox"]
1492 # if WebFinger request was already done, we probably have acct, otherwise
1493 # we request WebFinger here
1494 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1498 uri: get_actor_url(data["url"]),
1500 banner: normalize_image(data["image"]),
1503 is_locked: is_locked,
1504 is_discoverable: is_discoverable,
1505 invisible: invisible,
1506 avatar: normalize_image(data["icon"]),
1508 follower_address: data["followers"],
1509 following_address: data["following"],
1510 featured_address: featured_address,
1511 bio: data["summary"] || "",
1512 actor_type: actor_type,
1513 also_known_as: Map.get(data, "alsoKnownAs", []),
1514 public_key: public_key,
1515 inbox: data["inbox"],
1516 shared_inbox: shared_inbox,
1517 pinned_objects: pinned_objects,
1522 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1523 generated = "#{username}@#{URI.parse(data["id"]).host}"
1525 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1526 case WebFinger.finger(generated) do
1527 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1535 # nickname can be nil because of virtual actors
1536 defp generate_nickname(_), do: nil
1538 def fetch_follow_information_for_user(user) do
1539 with {:ok, following_data} <-
1540 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1541 {:ok, hide_follows} <- collection_private(following_data),
1542 {:ok, followers_data} <-
1543 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1544 {:ok, hide_followers} <- collection_private(followers_data) do
1547 hide_follows: hide_follows,
1548 follower_count: normalize_counter(followers_data["totalItems"]),
1549 following_count: normalize_counter(following_data["totalItems"]),
1550 hide_followers: hide_followers
1553 {:error, _} = e -> e
1558 defp normalize_counter(counter) when is_integer(counter), do: counter
1559 defp normalize_counter(_), do: 0
1561 def maybe_update_follow_information(user_data) do
1562 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1563 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1565 {:collections_available,
1566 !!(user_data[:following_address] && user_data[:follower_address])},
1568 fetch_follow_information_for_user(user_data) do
1569 info = Map.merge(user_data[:info] || %{}, info)
1572 |> Map.put(:info, info)
1574 {:user_type_check, false} ->
1577 {:collections_available, false} ->
1580 {:enabled, false} ->
1585 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1592 defp collection_private(%{"first" => %{"type" => type}})
1593 when type in ["CollectionPage", "OrderedCollectionPage"],
1596 defp collection_private(%{"first" => first}) do
1597 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1598 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1601 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1602 {:error, _} = e -> e
1607 defp collection_private(_data), do: {:ok, true}
1609 def user_data_from_user_object(data, additional \\ []) do
1610 with {:ok, data} <- MRF.filter(data) do
1611 {:ok, object_to_user_data(data, additional)}
1617 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1618 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1619 {:ok, data} <- user_data_from_user_object(data, additional) do
1620 {:ok, maybe_update_follow_information(data)}
1622 # If this has been deleted, only log a debug and not an error
1623 {:error, "Object has been deleted" = e} ->
1624 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1627 {:error, {:reject, reason} = e} ->
1628 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1632 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1637 def maybe_handle_clashing_nickname(data) do
1638 with nickname when is_binary(nickname) <- data[:nickname],
1639 %User{} = old_user <- User.get_by_nickname(nickname),
1640 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1642 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1646 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1647 |> User.update_and_set_cache()
1649 {:ap_id_comparison, true} ->
1651 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1659 def pin_data_from_featured_collection(%{
1660 "type" => "OrderedCollection",
1663 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1665 |> Map.get("orderedItems")
1666 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1669 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1674 def pin_data_from_featured_collection(
1679 when type in ["OrderedCollection", "Collection"] do
1680 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1682 # Items can either be a map _or_ a string
1685 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1686 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1690 def fetch_and_prepare_featured_from_ap_id(nil) do
1694 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1695 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1696 {:ok, pin_data_from_featured_collection(data)}
1699 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1704 def pinned_fetch_task(nil), do: nil
1706 def pinned_fetch_task(%{pinned_objects: pins}) do
1707 if Enum.all?(pins, fn {ap_id, _} ->
1708 Object.get_cached_by_ap_id(ap_id) ||
1709 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1717 def make_user_from_ap_id(ap_id, additional \\ []) do
1718 user = User.get_cached_by_ap_id(ap_id)
1720 if user && !User.ap_enabled?(user) do
1721 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1723 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1724 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1728 |> User.remote_user_changeset(data)
1729 |> User.update_and_set_cache()
1731 maybe_handle_clashing_nickname(data)
1734 |> User.remote_user_changeset()
1742 def make_user_from_nickname(nickname) do
1743 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1744 WebFinger.finger(nickname) do
1745 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1747 _e -> {:error, "No AP id in WebFinger"}
1751 # filter out broken threads
1752 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1753 entire_thread_visible_for_user?(activity, user)
1756 # do post-processing on a specific activity
1757 def contain_activity(%Activity{} = activity, %User{} = user) do
1758 contain_broken_threads(activity, user)
1761 def fetch_direct_messages_query do
1763 |> restrict_type(%{type: "Create"})
1764 |> restrict_visibility(%{visibility: "direct"})
1765 |> order_by([activity], asc: activity.id)