1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
8 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
16 alias Pleroma.Notification
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
36 require Pleroma.Constants
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
68 defp check_actor_can_insert(_), do: true
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
75 defp check_remote_limit(_), do: true
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
98 defp increase_replies_count_if_reply(_create_data), do: :noop
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
113 Repo.insert(%Activity{
116 recipients: recipients,
117 actor: object["actor"]
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
149 %Activity{} = activity ->
155 {:containment, _} = error ->
158 {:error, _} = error ->
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
166 recipients: recipients,
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
181 defp insert_activity_with_expiration(data, local, recipients) do
185 actor: data["actor"],
186 recipients: recipients
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
200 stream_out_participations(participations)
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
225 defp get_participations({:ok, conversation}) do
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
231 defp get_participations(_), do: []
233 def stream_out_participations(participations) do
236 |> Repo.preload(:user)
238 Streamer.stream("participation", participations)
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
259 def stream_out_participations(_, _), do: :noop
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
270 def stream_out(_activity) do
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
290 %{to: to, actor: actor, published: published, context: context, object: object},
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
305 {:quick_insert, true, activity} ->
308 {:fake, true, activity} ->
312 Repo.rollback(message)
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local)
332 defp do_unfollow(follower, followed, activity_id, local) when local == true do
333 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
334 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
335 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
336 {:ok, activity} <- insert(unfollow_data, local),
337 _ <- notify_and_stream(activity),
338 :ok <- maybe_federate(activity) do
342 {:error, error} -> Repo.rollback(error)
346 defp do_unfollow(follower, followed, activity_id, false) do
347 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
348 # uses deterministic ids for follows.
349 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
350 {:ok, _activity} <- Repo.delete(follow_activity),
351 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
352 unfollow_activity <- remote_unfollow_data(unfollow_data),
353 _ <- notify_and_stream(unfollow_activity) do
354 {:ok, unfollow_activity}
357 {:error, error} -> Repo.rollback(error)
361 defp remote_unfollow_data(data) do
362 {recipients, _, _} = get_recipients(data)
367 actor: data["actor"],
368 recipients: recipients
372 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
374 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
388 # only accept false as false value
389 local = !(params[:local] == false)
390 forward = !(params[:forward] == false)
392 additional = params[:additional] || %{}
396 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
398 Map.merge(additional, %{"to" => [], "cc" => []})
401 with flag_data <- make_flag_data(params, additional),
402 {:ok, activity} <- insert(flag_data, local),
403 {:ok, stripped_activity} <- strip_report_status_data(activity),
404 _ <- notify_and_stream(activity),
406 maybe_federate(stripped_activity) do
407 User.all_superusers()
408 |> Enum.filter(fn user -> user.ap_id != actor end)
409 |> Enum.filter(fn user -> not is_nil(user.email) end)
410 |> Enum.each(fn superuser ->
412 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
413 |> Pleroma.Emails.Mailer.deliver_async()
418 {:error, error} -> Repo.rollback(error)
422 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
423 def move(%User{} = origin, %User{} = target, local \\ true) do
426 "actor" => origin.ap_id,
427 "object" => origin.ap_id,
428 "target" => target.ap_id,
429 "to" => [origin.follower_address]
432 with true <- origin.ap_id in target.also_known_as,
433 {:ok, activity} <- insert(params, local),
434 _ <- notify_and_stream(activity) do
435 maybe_federate(activity)
437 BackgroundWorker.enqueue("move_following", %{
438 "origin_id" => origin.id,
439 "target_id" => target.id
444 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
449 def fetch_activities_for_context_query(context, opts) do
450 public = [Constants.as_public()]
454 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
457 from(activity in Activity)
458 |> maybe_preload_objects(opts)
459 |> maybe_preload_bookmarks(opts)
460 |> maybe_set_thread_muted_field(opts)
461 |> restrict_blocked(opts)
462 |> restrict_blockers_visibility(opts)
463 |> restrict_recipients(recipients, opts[:user])
464 |> restrict_filtered(opts)
468 "?->>'type' = ? and ?->>'context' = ?",
475 |> exclude_poll_votes(opts)
477 |> order_by([activity], desc: activity.id)
480 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
481 def fetch_activities_for_context(context, opts \\ %{}) do
483 |> fetch_activities_for_context_query(opts)
487 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
488 FlakeId.Ecto.CompatType.t() | nil
489 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
491 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
492 |> restrict_visibility(%{visibility: "direct"})
498 defp fetch_paginated_optimized(query, opts, pagination) do
499 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
500 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
501 opts = Map.put(opts, :skip_extra_order, true)
503 Pagination.fetch_paginated(query, opts, pagination)
506 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
507 list_memberships = Pleroma.List.memberships(opts[:user])
509 fetch_activities_query(recipients ++ list_memberships, opts)
510 |> fetch_paginated_optimized(opts, pagination)
512 |> maybe_update_cc(list_memberships, opts[:user])
515 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
516 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
517 includes_local_public = Map.get(opts, :includes_local_public, false)
519 opts = Map.delete(opts, :user)
521 intended_recipients =
522 if includes_local_public do
523 [Constants.as_public(), as_local_public()]
525 [Constants.as_public()]
529 |> fetch_activities_query(opts)
530 |> restrict_unlisted(opts)
531 |> fetch_paginated_optimized(opts, pagination)
534 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
535 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
537 |> Map.put(:restrict_unlisted, true)
538 |> fetch_public_or_unlisted_activities(pagination)
541 @valid_visibilities ~w[direct unlisted public private]
543 defp restrict_visibility(query, %{visibility: visibility})
544 when is_list(visibility) do
545 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
550 "activity_visibility(?, ?, ?) = ANY (?)",
558 Logger.error("Could not restrict visibility to #{visibility}")
562 defp restrict_visibility(query, %{visibility: visibility})
563 when visibility in @valid_visibilities do
567 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
571 defp restrict_visibility(_query, %{visibility: visibility})
572 when visibility not in @valid_visibilities do
573 Logger.error("Could not restrict visibility to #{visibility}")
576 defp restrict_visibility(query, _visibility), do: query
578 defp exclude_visibility(query, %{exclude_visibilities: visibility})
579 when is_list(visibility) do
580 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
585 "activity_visibility(?, ?, ?) = ANY (?)",
593 Logger.error("Could not exclude visibility to #{visibility}")
598 defp exclude_visibility(query, %{exclude_visibilities: visibility})
599 when visibility in @valid_visibilities do
604 "activity_visibility(?, ?, ?) = ?",
613 defp exclude_visibility(query, %{exclude_visibilities: visibility})
614 when visibility not in [nil | @valid_visibilities] do
615 Logger.error("Could not exclude visibility to #{visibility}")
619 defp exclude_visibility(query, _visibility), do: query
621 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
624 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
627 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
628 local_public = as_local_public()
632 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
636 defp restrict_thread_visibility(query, _, _), do: query
638 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
641 |> Map.put(:user, reading_user)
642 |> Map.put(:actor_id, user.ap_id)
645 godmode: params[:godmode],
646 reading_user: reading_user
648 |> user_activities_recipients()
649 |> fetch_activities(params)
653 def fetch_user_activities(user, reading_user, params \\ %{})
655 def fetch_user_activities(user, reading_user, %{total: true} = params) do
656 result = fetch_activities_for_user(user, reading_user, params)
658 Keyword.put(result, :items, Enum.reverse(result[:items]))
661 def fetch_user_activities(user, reading_user, params) do
663 |> fetch_activities_for_user(reading_user, params)
667 defp fetch_activities_for_user(user, reading_user, params) do
670 |> Map.put(:type, ["Create", "Announce"])
671 |> Map.put(:user, reading_user)
672 |> Map.put(:actor_id, user.ap_id)
673 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
676 if User.blocks?(reading_user, user) do
680 |> Map.put(:blocking_user, reading_user)
681 |> Map.put(:muting_user, reading_user)
684 pagination_type = Map.get(params, :pagination_type) || :keyset
687 godmode: params[:godmode],
688 reading_user: reading_user
690 |> user_activities_recipients()
691 |> fetch_activities(params, pagination_type)
694 def fetch_statuses(reading_user, %{total: true} = params) do
695 result = fetch_activities_for_reading_user(reading_user, params)
696 Keyword.put(result, :items, Enum.reverse(result[:items]))
699 def fetch_statuses(reading_user, params) do
701 |> fetch_activities_for_reading_user(params)
705 defp fetch_activities_for_reading_user(reading_user, params) do
706 params = Map.put(params, :type, ["Create", "Announce"])
709 godmode: params[:godmode],
710 reading_user: reading_user
712 |> user_activities_recipients()
713 |> fetch_activities(params, :offset)
716 defp user_activities_recipients(%{godmode: true}), do: []
718 defp user_activities_recipients(%{reading_user: reading_user}) do
719 if not is_nil(reading_user) and reading_user.local do
721 Constants.as_public(),
723 reading_user.ap_id | User.following(reading_user)
726 [Constants.as_public()]
730 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
731 raise "Can't use the child object without preloading!"
734 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
736 [activity, object] in query,
739 "?->>'type' != ? or ?->>'actor' != ?",
748 defp restrict_announce_object_actor(query, _), do: query
750 defp restrict_since(query, %{since_id: ""}), do: query
752 defp restrict_since(query, %{since_id: since_id}) do
753 from(activity in query, where: activity.id > ^since_id)
756 defp restrict_since(query, _), do: query
758 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
759 raise_on_missing_preload()
762 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
764 [_activity, object] in query,
765 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
769 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
770 restrict_embedded_tag_any(query, %{tag: tag})
773 defp restrict_embedded_tag_all(query, _), do: query
775 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
779 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
781 [_activity, object] in query,
782 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
786 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
787 restrict_embedded_tag_any(query, %{tag: [tag]})
790 defp restrict_embedded_tag_any(query, _), do: query
792 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
793 raise_on_missing_preload()
796 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
798 [_activity, object] in query,
799 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
803 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
804 when is_binary(tag_reject) do
805 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
808 defp restrict_embedded_tag_reject_any(query, _), do: query
810 defp object_ids_query_for_tags(tags) do
811 from(hto in "hashtags_objects")
812 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
813 |> where([hto, ht], ht.name in ^tags)
814 |> select([hto], hto.object_id)
815 |> distinct([hto], true)
818 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
819 raise_on_missing_preload()
822 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
823 restrict_hashtag_any(query, %{tag: single_tag})
826 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
828 [_activity, object] in query,
832 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
833 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
834 AND hashtags_objects.object_id = ?) @> ?
843 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
844 restrict_hashtag_all(query, %{tag_all: [tag]})
847 defp restrict_hashtag_all(query, _), do: query
849 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
850 raise_on_missing_preload()
853 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
855 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
858 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
860 [_activity, object] in query,
861 join: hto in "hashtags_objects",
862 on: hto.object_id == object.id,
863 where: hto.hashtag_id in ^hashtag_ids,
864 distinct: [desc: object.id],
865 order_by: [desc: object.id]
869 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
870 restrict_hashtag_any(query, %{tag: [tag]})
873 defp restrict_hashtag_any(query, _), do: query
875 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
876 raise_on_missing_preload()
879 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
881 [_activity, object] in query,
882 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
886 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
887 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
890 defp restrict_hashtag_reject_any(query, _), do: query
892 defp raise_on_missing_preload do
893 raise "Can't use the child object without preloading!"
896 defp restrict_recipients(query, [], _user), do: query
898 defp restrict_recipients(query, recipients, nil) do
899 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
902 defp restrict_recipients(query, recipients, user) do
905 where: fragment("? && ?", ^recipients, activity.recipients),
906 or_where: activity.actor == ^user.ap_id
910 defp restrict_local(query, %{local_only: true}) do
911 from(activity in query, where: activity.local == true)
914 defp restrict_local(query, _), do: query
916 defp restrict_remote(query, %{remote: true}) do
917 from(activity in query, where: activity.local == false)
920 defp restrict_remote(query, _), do: query
922 defp restrict_actor(query, %{actor_id: actor_id}) do
923 from(activity in query, where: activity.actor == ^actor_id)
926 defp restrict_actor(query, _), do: query
928 defp restrict_type(query, %{type: type}) when is_binary(type) do
929 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
932 defp restrict_type(query, %{type: type}) do
933 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
936 defp restrict_type(query, _), do: query
938 defp restrict_state(query, %{state: state}) do
939 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
942 defp restrict_state(query, _), do: query
944 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
946 [_activity, object] in query,
947 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
951 defp restrict_favorited_by(query, _), do: query
953 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
954 raise "Can't use the child object without preloading!"
957 defp restrict_media(query, %{only_media: true}) do
959 [activity, object] in query,
960 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
961 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
965 defp restrict_media(query, _), do: query
967 defp restrict_replies(query, %{exclude_replies: true}) do
969 [_activity, object] in query,
970 where: fragment("?->>'inReplyTo' is null", object.data)
974 defp restrict_replies(query, %{
975 reply_filtering_user: %User{} = user,
976 reply_visibility: "self"
979 [activity, object] in query,
982 "?->>'inReplyTo' is null OR ? = ANY(?)",
990 defp restrict_replies(query, %{
991 reply_filtering_user: %User{} = user,
992 reply_visibility: "following"
995 [activity, object] in query,
999 ?->>'type' != 'Create' -- This isn't a Create
1000 OR ?->>'inReplyTo' is null -- this isn't a reply
1001 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1002 -- unless they are the author (because authors
1003 -- are also part of the recipients). This leads
1004 -- to a bug that self-replies by friends won't
1006 OR ? = ? -- The actor is us
1010 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1011 activity.recipients,
1019 defp restrict_replies(query, _), do: query
1021 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1022 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1025 defp restrict_reblogs(query, _), do: query
1027 defp restrict_muted(query, %{with_muted: true}), do: query
1029 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1030 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1033 from([activity] in query,
1034 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1037 "not (?->'to' \\?| ?) or ? = ?",
1045 unless opts[:skip_preload] do
1046 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1052 defp restrict_muted(query, _), do: query
1054 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1055 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1056 domain_blocks = user.domain_blocks || []
1058 following_ap_ids = User.get_friends_ap_ids(user)
1061 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1064 [activity, object: o] in query,
1065 # You don't block the author
1066 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1068 # You don't block any recipients, and didn't author the post
1071 "((not (? && ?)) or ? = ?)",
1072 activity.recipients,
1078 # You don't block the domain of any recipients, and didn't author the post
1081 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1082 activity.recipients,
1088 # It's not a boost of a user you block
1091 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1097 # You don't block the author's domain, and also don't follow the author
1100 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1107 # Same as above, but checks the Object
1110 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1119 defp restrict_blocked(query, _), do: query
1121 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1122 if Config.get([:activitypub, :blockers_visible]) == true do
1125 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1129 # The author doesn't block you
1130 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1132 # It's not a boost of a user that blocks you
1135 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1144 defp restrict_blockers_visibility(query, _), do: query
1146 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1151 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1153 ^[Constants.as_public()]
1158 defp restrict_unlisted(query, _), do: query
1160 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1162 [activity, object: o] in query,
1165 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1174 defp restrict_pinned(query, _), do: query
1176 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1177 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1183 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1191 defp restrict_muted_reblogs(query, _), do: query
1193 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1196 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1200 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1203 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1207 defp restrict_instance(query, _), do: query
1209 defp restrict_filtered(query, %{user: %User{} = user}) do
1210 case Filter.compose_regex(user) do
1215 from([activity, object] in query,
1217 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1218 activity.actor == ^user.ap_id
1223 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1224 restrict_filtered(query, %{user: user})
1227 defp restrict_filtered(query, _), do: query
1229 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1231 defp exclude_poll_votes(query, _) do
1232 if has_named_binding?(query, :object) do
1233 from([activity, object: o] in query,
1234 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1241 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1243 defp exclude_invisible_actors(query, _opts) do
1245 User.Query.build(%{invisible: true, select: [:ap_id]})
1247 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1249 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1252 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1253 from(activity in query, where: activity.id != ^id)
1256 defp exclude_id(query, _), do: query
1258 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1260 defp maybe_preload_objects(query, _) do
1262 |> Activity.with_preloaded_object()
1265 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1267 defp maybe_preload_bookmarks(query, opts) do
1269 |> Activity.with_preloaded_bookmark(opts[:user])
1272 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1274 |> Activity.with_preloaded_report_notes()
1277 defp maybe_preload_report_notes(query, _), do: query
1279 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1281 defp maybe_set_thread_muted_field(query, opts) do
1283 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1286 defp maybe_order(query, %{order: :desc}) do
1288 |> order_by(desc: :id)
1291 defp maybe_order(query, %{order: :asc}) do
1293 |> order_by(asc: :id)
1296 defp maybe_order(query, _), do: query
1298 defp normalize_fetch_activities_query_opts(opts) do
1299 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1301 value when is_bitstring(value) ->
1302 Map.put(opts, key, Hashtag.normalize_name(value))
1304 value when is_list(value) ->
1307 |> Enum.map(&Hashtag.normalize_name/1)
1310 Map.put(opts, key, normalized_value)
1318 defp fetch_activities_query_ap_ids_ops(opts) do
1319 source_user = opts[:muting_user]
1320 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1322 ap_id_relationships =
1323 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1324 [:block | ap_id_relationships]
1329 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1331 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1332 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1334 restrict_muted_reblogs_opts =
1335 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1337 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1340 def fetch_activities_query(recipients, opts \\ %{}) do
1341 opts = normalize_fetch_activities_query_opts(opts)
1343 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1344 fetch_activities_query_ap_ids_ops(opts)
1347 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1352 |> maybe_preload_objects(opts)
1353 |> maybe_preload_bookmarks(opts)
1354 |> maybe_preload_report_notes(opts)
1355 |> maybe_set_thread_muted_field(opts)
1356 |> maybe_order(opts)
1357 |> restrict_recipients(recipients, opts[:user])
1358 |> restrict_replies(opts)
1359 |> restrict_since(opts)
1360 |> restrict_local(opts)
1361 |> restrict_remote(opts)
1362 |> restrict_actor(opts)
1363 |> restrict_type(opts)
1364 |> restrict_state(opts)
1365 |> restrict_favorited_by(opts)
1366 |> restrict_blocked(restrict_blocked_opts)
1367 |> restrict_blockers_visibility(opts)
1368 |> restrict_muted(restrict_muted_opts)
1369 |> restrict_filtered(opts)
1370 |> restrict_media(opts)
1371 |> restrict_visibility(opts)
1372 |> restrict_thread_visibility(opts, config)
1373 |> restrict_reblogs(opts)
1374 |> restrict_pinned(opts)
1375 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1376 |> restrict_instance(opts)
1377 |> restrict_announce_object_actor(opts)
1378 |> restrict_filtered(opts)
1379 |> Activity.restrict_deactivated_users()
1380 |> exclude_poll_votes(opts)
1381 |> exclude_invisible_actors(opts)
1382 |> exclude_visibility(opts)
1384 if Config.feature_enabled?(:improved_hashtag_timeline) do
1386 |> restrict_hashtag_any(opts)
1387 |> restrict_hashtag_all(opts)
1388 |> restrict_hashtag_reject_any(opts)
1391 |> restrict_embedded_tag_any(opts)
1392 |> restrict_embedded_tag_all(opts)
1393 |> restrict_embedded_tag_reject_any(opts)
1398 Fetch favorites activities of user with order by sort adds to favorites
1400 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1401 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1403 |> Activity.Queries.by_actor()
1404 |> Activity.Queries.by_type("Like")
1405 |> Activity.with_joined_object()
1406 |> Object.with_joined_activity()
1407 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1408 |> order_by([like, _, _], desc_nulls_last: like.id)
1409 |> Pagination.fetch_paginated(
1410 Map.merge(params, %{skip_order: true}),
1415 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1416 Enum.map(activities, fn
1417 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1418 if Enum.any?(bcc, &(&1 in list_memberships)) do
1419 update_in(activity.data["cc"], &[user_ap_id | &1])
1429 defp maybe_update_cc(activities, _, _), do: activities
1431 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1432 from(activity in query,
1434 fragment("? && ?", activity.recipients, ^recipients) or
1435 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1436 ^Constants.as_public() in activity.recipients)
1440 def fetch_activities_bounded(
1442 recipients_with_public,
1444 pagination \\ :keyset
1446 fetch_activities_query([], opts)
1447 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1448 |> Pagination.fetch_paginated(opts, pagination)
1452 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1453 def upload(file, opts \\ []) do
1454 with {:ok, data} <- Upload.store(file, opts) do
1455 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1457 Repo.insert(%Object{data: obj_data})
1461 @spec get_actor_url(any()) :: binary() | nil
1462 defp get_actor_url(url) when is_binary(url), do: url
1463 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1465 defp get_actor_url(url) when is_list(url) do
1471 defp get_actor_url(_url), do: nil
1473 defp normalize_image(%{"url" => url}) do
1476 "url" => [%{"href" => url}]
1480 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1481 defp normalize_image(_), do: nil
1483 defp object_to_user_data(data, additional) do
1486 |> Map.get("attachment", [])
1487 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1488 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1492 |> Map.get("tag", [])
1494 %{"type" => "Emoji"} -> true
1497 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1498 {String.trim(name, ":"), url}
1501 is_locked = data["manuallyApprovesFollowers"] || false
1502 data = Transmogrifier.maybe_fix_user_object(data)
1503 is_discoverable = data["discoverable"] || false
1504 invisible = data["invisible"] || false
1505 actor_type = data["type"] || "Person"
1507 featured_address = data["featured"]
1508 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1511 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1512 data["publicKey"]["publicKeyPem"]
1516 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1517 data["endpoints"]["sharedInbox"]
1520 # if WebFinger request was already done, we probably have acct, otherwise
1521 # we request WebFinger here
1522 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1526 uri: get_actor_url(data["url"]),
1528 banner: normalize_image(data["image"]),
1531 is_locked: is_locked,
1532 is_discoverable: is_discoverable,
1533 invisible: invisible,
1534 avatar: normalize_image(data["icon"]),
1536 follower_address: data["followers"],
1537 following_address: data["following"],
1538 featured_address: featured_address,
1539 bio: data["summary"] || "",
1540 actor_type: actor_type,
1541 also_known_as: Map.get(data, "alsoKnownAs", []),
1542 public_key: public_key,
1543 inbox: data["inbox"],
1544 shared_inbox: shared_inbox,
1545 pinned_objects: pinned_objects,
1550 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1551 generated = "#{username}@#{URI.parse(data["id"]).host}"
1553 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1554 case WebFinger.finger(generated) do
1555 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1563 # nickname can be nil because of virtual actors
1564 defp generate_nickname(_), do: nil
1566 def fetch_follow_information_for_user(user) do
1567 with {:ok, following_data} <-
1568 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1569 {:ok, hide_follows} <- collection_private(following_data),
1570 {:ok, followers_data} <-
1571 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1572 {:ok, hide_followers} <- collection_private(followers_data) do
1575 hide_follows: hide_follows,
1576 follower_count: normalize_counter(followers_data["totalItems"]),
1577 following_count: normalize_counter(following_data["totalItems"]),
1578 hide_followers: hide_followers
1581 {:error, _} = e -> e
1586 defp normalize_counter(counter) when is_integer(counter), do: counter
1587 defp normalize_counter(_), do: 0
1589 def maybe_update_follow_information(user_data) do
1590 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1591 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1593 {:collections_available,
1594 !!(user_data[:following_address] && user_data[:follower_address])},
1596 fetch_follow_information_for_user(user_data) do
1597 info = Map.merge(user_data[:info] || %{}, info)
1600 |> Map.put(:info, info)
1602 {:user_type_check, false} ->
1605 {:collections_available, false} ->
1608 {:enabled, false} ->
1613 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1620 defp collection_private(%{"first" => %{"type" => type}})
1621 when type in ["CollectionPage", "OrderedCollectionPage"],
1624 defp collection_private(%{"first" => first}) do
1625 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1626 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1629 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1630 {:error, _} = e -> e
1635 defp collection_private(_data), do: {:ok, true}
1637 def user_data_from_user_object(data, additional \\ []) do
1638 with {:ok, data} <- MRF.filter(data) do
1639 {:ok, object_to_user_data(data, additional)}
1645 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1646 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1647 {:ok, data} <- user_data_from_user_object(data, additional) do
1648 {:ok, maybe_update_follow_information(data)}
1650 # If this has been deleted, only log a debug and not an error
1651 {:error, "Object has been deleted" = e} ->
1652 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1655 {:error, {:reject, reason} = e} ->
1656 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1660 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1665 def maybe_handle_clashing_nickname(data) do
1666 with nickname when is_binary(nickname) <- data[:nickname],
1667 %User{} = old_user <- User.get_by_nickname(nickname),
1668 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1670 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1674 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1675 |> User.update_and_set_cache()
1677 {:ap_id_comparison, true} ->
1679 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1687 def pin_data_from_featured_collection(%{
1688 "type" => "OrderedCollection",
1691 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1693 |> Map.get("orderedItems")
1694 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1697 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1702 def pin_data_from_featured_collection(
1707 when type in ["OrderedCollection", "Collection"] do
1708 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1710 # Items can either be a map _or_ a string
1713 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1714 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1718 def fetch_and_prepare_featured_from_ap_id(nil) do
1722 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1723 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1724 {:ok, pin_data_from_featured_collection(data)}
1727 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1732 def pinned_fetch_task(nil), do: nil
1734 def pinned_fetch_task(%{pinned_objects: pins}) do
1735 if Enum.all?(pins, fn {ap_id, _} ->
1736 Object.get_cached_by_ap_id(ap_id) ||
1737 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1745 def make_user_from_ap_id(ap_id, additional \\ []) do
1746 user = User.get_cached_by_ap_id(ap_id)
1748 if user && !User.ap_enabled?(user) do
1749 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1751 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1752 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1756 |> User.remote_user_changeset(data)
1757 |> User.update_and_set_cache()
1759 maybe_handle_clashing_nickname(data)
1762 |> User.remote_user_changeset()
1770 def make_user_from_nickname(nickname) do
1771 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1772 WebFinger.finger(nickname) do
1773 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1775 _e -> {:error, "No AP id in WebFinger"}
1779 # filter out broken threads
1780 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1781 entire_thread_visible_for_user?(activity, user)
1784 # do post-processing on a specific activity
1785 def contain_activity(%Activity{} = activity, %User{} = user) do
1786 contain_broken_threads(activity, user)
1789 def fetch_direct_messages_query do
1791 |> restrict_type(%{type: "Create"})
1792 |> restrict_visibility(%{visibility: "direct"})
1793 |> order_by([activity], asc: activity.id)