1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
35 require Pleroma.Constants
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
67 defp check_actor_can_insert(_), do: true
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
74 defp check_remote_limit(_), do: true
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
84 def update_last_status_at_if_public(actor, object) do
85 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
88 defp increase_replies_count_if_reply(%{
89 "object" => %{"inReplyTo" => reply_ap_id} = object,
92 if is_public?(object) do
93 Object.increase_replies_count(reply_ap_id)
97 defp increase_replies_count_if_reply(_create_data), do: :noop
99 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
101 def persist(%{"type" => type} = object, meta) when type in @object_types do
102 with {:ok, object} <- Object.create(object) do
108 def persist(object, meta) do
109 with local <- Keyword.fetch!(meta, :local),
110 {recipients, _, _} <- get_recipients(object),
112 Repo.insert(%Activity{
115 recipients: recipients,
116 actor: object["actor"]
118 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
119 {:ok, _} <- maybe_create_activity_expiration(activity) do
120 {:ok, activity, meta}
124 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
125 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
129 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
134 {:ok, map, object} <- insert_full_object(map),
135 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
136 # Splice in the child object if we have one.
137 activity = Maps.put_if_present(activity, :object, object)
139 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
140 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
143 search_module = Pleroma.Config.get([Pleroma.Search, :module])
145 ConcurrentLimiter.limit(Pleroma.Search, fn ->
146 Task.start(fn -> search_module.add_to_index(activity) end)
151 %Activity{} = activity ->
157 {:containment, _} = error ->
160 {:error, _} = error ->
163 {:fake, true, map, recipients} ->
164 activity = %Activity{
168 recipients: recipients,
172 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
175 {:remote_limit_pass, _} ->
176 {:error, :remote_limit}
183 defp insert_activity_with_expiration(data, local, recipients) do
187 actor: data["actor"],
188 recipients: recipients
191 with {:ok, activity} <- Repo.insert(struct) do
192 maybe_create_activity_expiration(activity)
196 def notify_and_stream(activity) do
197 Notification.create_notifications(activity)
199 conversation = create_or_bump_conversation(activity, activity.actor)
200 participations = get_participations(conversation)
202 stream_out_participations(participations)
205 defp maybe_create_activity_expiration(
206 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
209 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
210 activity_id: activity.id,
211 expires_at: expires_at
217 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
219 defp create_or_bump_conversation(activity, actor) do
220 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
221 %User{} = user <- User.get_cached_by_ap_id(actor) do
222 Participation.mark_as_read(user, conversation)
227 defp get_participations({:ok, conversation}) do
229 |> Repo.preload(:participations, force: true)
230 |> Map.get(:participations)
233 defp get_participations(_), do: []
235 def stream_out_participations(participations) do
238 |> Repo.preload(:user)
240 Streamer.stream("participation", participations)
244 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
245 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
246 conversation = Repo.preload(conversation, :participations)
249 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
254 if last_activity_id do
255 stream_out_participations(conversation.participations)
261 def stream_out_participations(_, _), do: :noop
264 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
265 when data_type in ["Create", "Announce", "Delete"] do
267 |> Topics.get_activity_topics()
268 |> Streamer.stream(activity)
272 def stream_out(_activity) do
276 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
277 def create(params, fake \\ false) do
278 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
283 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
284 additional = params[:additional] || %{}
285 # only accept false as false value
286 local = !(params[:local] == false)
287 published = params[:published]
288 quick_insert? = Config.get([:env]) == :benchmark
292 %{to: to, actor: actor, published: published, context: context, object: object},
296 with {:ok, activity} <- insert(create_data, local, fake),
297 {:fake, false, activity} <- {:fake, fake, activity},
298 _ <- increase_replies_count_if_reply(create_data),
299 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
300 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
301 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
302 _ <- notify_and_stream(activity),
303 :ok <- maybe_schedule_poll_notifications(activity),
304 :ok <- maybe_federate(activity) do
307 {:quick_insert, true, activity} ->
310 {:fake, true, activity} ->
314 Repo.rollback(message)
318 defp maybe_schedule_poll_notifications(activity) do
319 PollWorker.schedule_poll_end(activity)
323 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
324 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
325 additional = params[:additional] || %{}
326 # only accept false as false value
327 local = !(params[:local] == false)
328 published = params[:published]
332 %{to: to, actor: actor, published: published, context: context, object: object},
336 with {:ok, activity} <- insert(listen_data, local),
337 _ <- notify_and_stream(activity),
338 :ok <- maybe_federate(activity) do
343 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
344 {:ok, Activity.t()} | nil | {:error, any()}
345 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
346 with {:ok, result} <-
347 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
352 defp do_unfollow(follower, followed, activity_id, local) do
353 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
354 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
355 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
356 {:ok, activity} <- insert(unfollow_data, local),
357 _ <- notify_and_stream(activity),
358 :ok <- maybe_federate(activity) do
362 {:error, error} -> Repo.rollback(error)
366 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
368 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
382 # only accept false as false value
383 local = !(params[:local] == false)
384 forward = !(params[:forward] == false)
386 additional = params[:additional] || %{}
390 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
392 Map.merge(additional, %{"to" => [], "cc" => []})
395 with flag_data <- make_flag_data(params, additional),
396 {:ok, activity} <- insert(flag_data, local),
397 {:ok, stripped_activity} <- strip_report_status_data(activity),
398 _ <- notify_and_stream(activity),
400 maybe_federate(stripped_activity) do
401 User.all_superusers()
402 |> Enum.filter(fn user -> user.ap_id != actor end)
403 |> Enum.filter(fn user -> not is_nil(user.email) end)
404 |> Enum.each(fn superuser ->
406 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
407 |> Pleroma.Emails.Mailer.deliver_async()
412 {:error, error} -> Repo.rollback(error)
416 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
417 def move(%User{} = origin, %User{} = target, local \\ true) do
420 "actor" => origin.ap_id,
421 "object" => origin.ap_id,
422 "target" => target.ap_id
425 with true <- origin.ap_id in target.also_known_as,
426 {:ok, activity} <- insert(params, local),
427 _ <- notify_and_stream(activity) do
428 maybe_federate(activity)
430 BackgroundWorker.enqueue("move_following", %{
431 "origin_id" => origin.id,
432 "target_id" => target.id
437 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
442 def fetch_activities_for_context_query(context, opts) do
443 public = [Constants.as_public()]
447 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
450 from(activity in Activity)
451 |> maybe_preload_objects(opts)
452 |> maybe_preload_bookmarks(opts)
453 |> maybe_set_thread_muted_field(opts)
454 |> restrict_blocked(opts)
455 |> restrict_blockers_visibility(opts)
456 |> restrict_recipients(recipients, opts[:user])
457 |> restrict_filtered(opts)
461 "?->>'type' = ? and ?->>'context' = ?",
468 |> exclude_poll_votes(opts)
470 |> order_by([activity], desc: activity.id)
473 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
474 def fetch_activities_for_context(context, opts \\ %{}) do
476 |> fetch_activities_for_context_query(opts)
480 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
481 FlakeId.Ecto.CompatType.t() | nil
482 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
484 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
485 |> restrict_visibility(%{visibility: "direct"})
491 defp fetch_paginated_optimized(query, opts, pagination) do
492 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
493 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
494 opts = Map.put(opts, :skip_extra_order, true)
496 Pagination.fetch_paginated(query, opts, pagination)
499 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
500 list_memberships = Pleroma.List.memberships(opts[:user])
502 fetch_activities_query(recipients ++ list_memberships, opts)
503 |> fetch_paginated_optimized(opts, pagination)
505 |> maybe_update_cc(list_memberships, opts[:user])
508 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
509 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
510 opts = Map.delete(opts, :user)
512 [Constants.as_public()]
513 |> fetch_activities_query(opts)
514 |> restrict_unlisted(opts)
515 |> fetch_paginated_optimized(opts, pagination)
518 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
519 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
521 |> Map.put(:restrict_unlisted, true)
522 |> fetch_public_or_unlisted_activities(pagination)
525 @valid_visibilities ~w[direct unlisted public private]
527 defp restrict_visibility(query, %{visibility: visibility})
528 when is_list(visibility) do
529 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
534 "activity_visibility(?, ?, ?) = ANY (?)",
542 Logger.error("Could not restrict visibility to #{visibility}")
546 defp restrict_visibility(query, %{visibility: visibility})
547 when visibility in @valid_visibilities do
551 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
555 defp restrict_visibility(_query, %{visibility: visibility})
556 when visibility not in @valid_visibilities do
557 Logger.error("Could not restrict visibility to #{visibility}")
560 defp restrict_visibility(query, _visibility), do: query
562 defp exclude_visibility(query, %{exclude_visibilities: visibility})
563 when is_list(visibility) do
564 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
569 "activity_visibility(?, ?, ?) = ANY (?)",
577 Logger.error("Could not exclude visibility to #{visibility}")
582 defp exclude_visibility(query, %{exclude_visibilities: visibility})
583 when visibility in @valid_visibilities do
588 "activity_visibility(?, ?, ?) = ?",
597 defp exclude_visibility(query, %{exclude_visibilities: visibility})
598 when visibility not in [nil | @valid_visibilities] do
599 Logger.error("Could not exclude visibility to #{visibility}")
603 defp exclude_visibility(query, _visibility), do: query
605 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
608 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
611 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
614 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
618 defp restrict_thread_visibility(query, _, _), do: query
620 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
623 |> Map.put(:user, reading_user)
624 |> Map.put(:actor_id, user.ap_id)
627 godmode: params[:godmode],
628 reading_user: reading_user
630 |> user_activities_recipients()
631 |> fetch_activities(params)
635 def fetch_user_activities(user, reading_user, params \\ %{})
637 def fetch_user_activities(user, reading_user, %{total: true} = params) do
638 result = fetch_activities_for_user(user, reading_user, params)
640 Keyword.put(result, :items, Enum.reverse(result[:items]))
643 def fetch_user_activities(user, reading_user, params) do
645 |> fetch_activities_for_user(reading_user, params)
649 defp fetch_activities_for_user(user, reading_user, params) do
652 |> Map.put(:type, ["Create", "Announce"])
653 |> Map.put(:user, reading_user)
654 |> Map.put(:actor_id, user.ap_id)
655 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
658 if User.blocks?(reading_user, user) do
662 |> Map.put(:blocking_user, reading_user)
663 |> Map.put(:muting_user, reading_user)
666 pagination_type = Map.get(params, :pagination_type) || :keyset
669 godmode: params[:godmode],
670 reading_user: reading_user
672 |> user_activities_recipients()
673 |> fetch_activities(params, pagination_type)
676 def fetch_statuses(reading_user, %{total: true} = params) do
677 result = fetch_activities_for_reading_user(reading_user, params)
678 Keyword.put(result, :items, Enum.reverse(result[:items]))
681 def fetch_statuses(reading_user, params) do
683 |> fetch_activities_for_reading_user(params)
687 defp fetch_activities_for_reading_user(reading_user, params) do
688 params = Map.put(params, :type, ["Create", "Announce"])
691 godmode: params[:godmode],
692 reading_user: reading_user
694 |> user_activities_recipients()
695 |> fetch_activities(params, :offset)
698 defp user_activities_recipients(%{godmode: true}), do: []
700 defp user_activities_recipients(%{reading_user: reading_user}) do
702 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
704 [Constants.as_public()]
708 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
709 raise "Can't use the child object without preloading!"
712 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
714 [activity, object] in query,
717 "?->>'type' != ? or ?->>'actor' != ?",
726 defp restrict_announce_object_actor(query, _), do: query
728 defp restrict_since(query, %{since_id: ""}), do: query
730 defp restrict_since(query, %{since_id: since_id}) do
731 from(activity in query, where: activity.id > ^since_id)
734 defp restrict_since(query, _), do: query
736 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
737 raise_on_missing_preload()
740 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
742 [_activity, object] in query,
743 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
747 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
748 restrict_embedded_tag_any(query, %{tag: tag})
751 defp restrict_embedded_tag_all(query, _), do: query
753 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
754 raise_on_missing_preload()
757 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
759 [_activity, object] in query,
760 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
764 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
765 restrict_embedded_tag_any(query, %{tag: [tag]})
768 defp restrict_embedded_tag_any(query, _), do: query
770 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
771 raise_on_missing_preload()
774 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
776 [_activity, object] in query,
777 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
781 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
782 when is_binary(tag_reject) do
783 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
786 defp restrict_embedded_tag_reject_any(query, _), do: query
788 defp object_ids_query_for_tags(tags) do
789 from(hto in "hashtags_objects")
790 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
791 |> where([hto, ht], ht.name in ^tags)
792 |> select([hto], hto.object_id)
793 |> distinct([hto], true)
796 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
797 raise_on_missing_preload()
800 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
801 restrict_hashtag_any(query, %{tag: single_tag})
804 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
806 [_activity, object] in query,
810 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
811 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
812 AND hashtags_objects.object_id = ?) @> ?
821 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
822 restrict_hashtag_all(query, %{tag_all: [tag]})
825 defp restrict_hashtag_all(query, _), do: query
827 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
828 raise_on_missing_preload()
831 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
833 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
836 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
838 [_activity, object] in query,
839 join: hto in "hashtags_objects",
840 on: hto.object_id == object.id,
841 where: hto.hashtag_id in ^hashtag_ids,
842 distinct: [desc: object.id],
843 order_by: [desc: object.id]
847 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
848 restrict_hashtag_any(query, %{tag: [tag]})
851 defp restrict_hashtag_any(query, _), do: query
853 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
854 raise_on_missing_preload()
857 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
859 [_activity, object] in query,
860 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
864 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
865 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
868 defp restrict_hashtag_reject_any(query, _), do: query
870 defp raise_on_missing_preload do
871 raise "Can't use the child object without preloading!"
874 defp restrict_recipients(query, [], _user), do: query
876 defp restrict_recipients(query, recipients, nil) do
877 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
880 defp restrict_recipients(query, recipients, user) do
883 where: fragment("? && ?", ^recipients, activity.recipients),
884 or_where: activity.actor == ^user.ap_id
888 defp restrict_local(query, %{local_only: true}) do
889 from(activity in query, where: activity.local == true)
892 defp restrict_local(query, _), do: query
894 defp restrict_remote(query, %{remote: true}) do
895 from(activity in query, where: activity.local == false)
898 defp restrict_remote(query, _), do: query
900 defp restrict_actor(query, %{actor_id: actor_id}) do
901 from(activity in query, where: activity.actor == ^actor_id)
904 defp restrict_actor(query, _), do: query
906 defp restrict_type(query, %{type: type}) when is_binary(type) do
907 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
910 defp restrict_type(query, %{type: type}) do
911 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
914 defp restrict_type(query, _), do: query
916 defp restrict_state(query, %{state: state}) do
917 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
920 defp restrict_state(query, _), do: query
922 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
924 [_activity, object] in query,
925 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
929 defp restrict_favorited_by(query, _), do: query
931 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
932 raise "Can't use the child object without preloading!"
935 defp restrict_media(query, %{only_media: true}) do
937 [activity, object] in query,
938 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
939 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
943 defp restrict_media(query, _), do: query
945 defp restrict_replies(query, %{exclude_replies: true}) do
947 [_activity, object] in query,
948 where: fragment("?->>'inReplyTo' is null", object.data)
952 defp restrict_replies(query, %{
953 reply_filtering_user: %User{} = user,
954 reply_visibility: "self"
957 [activity, object] in query,
960 "?->>'inReplyTo' is null OR ? = ANY(?)",
968 defp restrict_replies(query, %{
969 reply_filtering_user: %User{} = user,
970 reply_visibility: "following"
973 [activity, object] in query,
977 ?->>'type' != 'Create' -- This isn't a Create
978 OR ?->>'inReplyTo' is null -- this isn't a reply
979 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
980 -- unless they are the author (because authors
981 -- are also part of the recipients). This leads
982 -- to a bug that self-replies by friends won't
984 OR ? = ? -- The actor is us
988 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
997 defp restrict_replies(query, _), do: query
999 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1000 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1003 defp restrict_reblogs(query, _), do: query
1005 defp restrict_muted(query, %{with_muted: true}), do: query
1007 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1008 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1011 from([activity] in query,
1012 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1015 "not (?->'to' \\?| ?) or ? = ?",
1023 unless opts[:skip_preload] do
1024 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1030 defp restrict_muted(query, _), do: query
1032 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1033 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1034 domain_blocks = user.domain_blocks || []
1036 following_ap_ids = User.get_friends_ap_ids(user)
1039 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1042 [activity, object: o] in query,
1043 # You don't block the author
1044 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1046 # You don't block any recipients, and didn't author the post
1049 "((not (? && ?)) or ? = ?)",
1050 activity.recipients,
1056 # You don't block the domain of any recipients, and didn't author the post
1059 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1060 activity.recipients,
1066 # It's not a boost of a user you block
1069 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1075 # You don't block the author's domain, and also don't follow the author
1078 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1085 # Same as above, but checks the Object
1088 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1097 defp restrict_blocked(query, _), do: query
1099 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1100 if Config.get([:activitypub, :blockers_visible]) == true do
1103 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1107 # The author doesn't block you
1108 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1110 # It's not a boost of a user that blocks you
1113 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1122 defp restrict_blockers_visibility(query, _), do: query
1124 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1129 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1131 ^[Constants.as_public()]
1136 defp restrict_unlisted(query, _), do: query
1138 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1140 [activity, object: o] in query,
1143 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1152 defp restrict_pinned(query, _), do: query
1154 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1155 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1161 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1169 defp restrict_muted_reblogs(query, _), do: query
1171 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1174 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1178 defp restrict_instance(query, _), do: query
1180 defp restrict_filtered(query, %{user: %User{} = user}) do
1181 case Filter.compose_regex(user) do
1186 from([activity, object] in query,
1188 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1189 activity.actor == ^user.ap_id
1194 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1195 restrict_filtered(query, %{user: user})
1198 defp restrict_filtered(query, _), do: query
1200 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1202 defp exclude_poll_votes(query, _) do
1203 if has_named_binding?(query, :object) do
1204 from([activity, object: o] in query,
1205 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1212 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1214 defp exclude_chat_messages(query, _) do
1215 if has_named_binding?(query, :object) do
1216 from([activity, object: o] in query,
1217 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1224 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1226 defp exclude_invisible_actors(query, _opts) do
1228 User.Query.build(%{invisible: true, select: [:ap_id]})
1230 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1232 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1235 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1236 from(activity in query, where: activity.id != ^id)
1239 defp exclude_id(query, _), do: query
1241 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1243 defp maybe_preload_objects(query, _) do
1245 |> Activity.with_preloaded_object()
1248 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1250 defp maybe_preload_bookmarks(query, opts) do
1252 |> Activity.with_preloaded_bookmark(opts[:user])
1255 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1257 |> Activity.with_preloaded_report_notes()
1260 defp maybe_preload_report_notes(query, _), do: query
1262 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1264 defp maybe_set_thread_muted_field(query, opts) do
1266 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1269 defp maybe_order(query, %{order: :desc}) do
1271 |> order_by(desc: :id)
1274 defp maybe_order(query, %{order: :asc}) do
1276 |> order_by(asc: :id)
1279 defp maybe_order(query, _), do: query
1281 defp normalize_fetch_activities_query_opts(opts) do
1282 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1284 value when is_bitstring(value) ->
1285 Map.put(opts, key, Hashtag.normalize_name(value))
1287 value when is_list(value) ->
1290 |> Enum.map(&Hashtag.normalize_name/1)
1293 Map.put(opts, key, normalized_value)
1301 defp fetch_activities_query_ap_ids_ops(opts) do
1302 source_user = opts[:muting_user]
1303 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1305 ap_id_relationships =
1306 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1307 [:block | ap_id_relationships]
1312 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1314 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1315 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1317 restrict_muted_reblogs_opts =
1318 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1320 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1323 def fetch_activities_query(recipients, opts \\ %{}) do
1324 opts = normalize_fetch_activities_query_opts(opts)
1326 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1327 fetch_activities_query_ap_ids_ops(opts)
1330 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1335 |> maybe_preload_objects(opts)
1336 |> maybe_preload_bookmarks(opts)
1337 |> maybe_preload_report_notes(opts)
1338 |> maybe_set_thread_muted_field(opts)
1339 |> maybe_order(opts)
1340 |> restrict_recipients(recipients, opts[:user])
1341 |> restrict_replies(opts)
1342 |> restrict_since(opts)
1343 |> restrict_local(opts)
1344 |> restrict_remote(opts)
1345 |> restrict_actor(opts)
1346 |> restrict_type(opts)
1347 |> restrict_state(opts)
1348 |> restrict_favorited_by(opts)
1349 |> restrict_blocked(restrict_blocked_opts)
1350 |> restrict_blockers_visibility(opts)
1351 |> restrict_muted(restrict_muted_opts)
1352 |> restrict_filtered(opts)
1353 |> restrict_media(opts)
1354 |> restrict_visibility(opts)
1355 |> restrict_thread_visibility(opts, config)
1356 |> restrict_reblogs(opts)
1357 |> restrict_pinned(opts)
1358 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1359 |> restrict_instance(opts)
1360 |> restrict_announce_object_actor(opts)
1361 |> restrict_filtered(opts)
1362 |> Activity.restrict_deactivated_users()
1363 |> exclude_poll_votes(opts)
1364 |> exclude_chat_messages(opts)
1365 |> exclude_invisible_actors(opts)
1366 |> exclude_visibility(opts)
1368 if Config.feature_enabled?(:improved_hashtag_timeline) do
1370 |> restrict_hashtag_any(opts)
1371 |> restrict_hashtag_all(opts)
1372 |> restrict_hashtag_reject_any(opts)
1375 |> restrict_embedded_tag_any(opts)
1376 |> restrict_embedded_tag_all(opts)
1377 |> restrict_embedded_tag_reject_any(opts)
1382 Fetch favorites activities of user with order by sort adds to favorites
1384 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1385 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1387 |> Activity.Queries.by_actor()
1388 |> Activity.Queries.by_type("Like")
1389 |> Activity.with_joined_object()
1390 |> Object.with_joined_activity()
1391 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1392 |> order_by([like, _, _], desc_nulls_last: like.id)
1393 |> Pagination.fetch_paginated(
1394 Map.merge(params, %{skip_order: true}),
1399 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1400 Enum.map(activities, fn
1401 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1402 if Enum.any?(bcc, &(&1 in list_memberships)) do
1403 update_in(activity.data["cc"], &[user_ap_id | &1])
1413 defp maybe_update_cc(activities, _, _), do: activities
1415 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1416 from(activity in query,
1418 fragment("? && ?", activity.recipients, ^recipients) or
1419 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1420 ^Constants.as_public() in activity.recipients)
1424 def fetch_activities_bounded(
1426 recipients_with_public,
1428 pagination \\ :keyset
1430 fetch_activities_query([], opts)
1431 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1432 |> Pagination.fetch_paginated(opts, pagination)
1436 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1437 def upload(file, opts \\ []) do
1438 with {:ok, data} <- Upload.store(file, opts) do
1439 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1441 Repo.insert(%Object{data: obj_data})
1445 @spec get_actor_url(any()) :: binary() | nil
1446 defp get_actor_url(url) when is_binary(url), do: url
1447 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1449 defp get_actor_url(url) when is_list(url) do
1455 defp get_actor_url(_url), do: nil
1457 defp normalize_image(%{"url" => url}) do
1460 "url" => [%{"href" => url}]
1464 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1465 defp normalize_image(_), do: nil
1467 defp object_to_user_data(data) do
1470 |> Map.get("attachment", [])
1471 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1472 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1476 |> Map.get("tag", [])
1478 %{"type" => "Emoji"} -> true
1481 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1482 {String.trim(name, ":"), url}
1485 is_locked = data["manuallyApprovesFollowers"] || false
1486 capabilities = data["capabilities"] || %{}
1487 accepts_chat_messages = capabilities["acceptsChatMessages"]
1488 data = Transmogrifier.maybe_fix_user_object(data)
1489 is_discoverable = data["discoverable"] || false
1490 invisible = data["invisible"] || false
1491 actor_type = data["type"] || "Person"
1493 featured_address = data["featured"]
1494 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1497 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1498 data["publicKey"]["publicKeyPem"]
1504 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1505 data["endpoints"]["sharedInbox"]
1512 uri: get_actor_url(data["url"]),
1514 banner: normalize_image(data["image"]),
1517 is_locked: is_locked,
1518 is_discoverable: is_discoverable,
1519 invisible: invisible,
1520 avatar: normalize_image(data["icon"]),
1522 follower_address: data["followers"],
1523 following_address: data["following"],
1524 featured_address: featured_address,
1525 bio: data["summary"] || "",
1526 actor_type: actor_type,
1527 also_known_as: Map.get(data, "alsoKnownAs", []),
1528 public_key: public_key,
1529 inbox: data["inbox"],
1530 shared_inbox: shared_inbox,
1531 accepts_chat_messages: accepts_chat_messages,
1532 pinned_objects: pinned_objects
1535 # nickname can be nil because of virtual actors
1536 if data["preferredUsername"] do
1540 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1543 Map.put(user_data, :nickname, nil)
1547 def fetch_follow_information_for_user(user) do
1548 with {:ok, following_data} <-
1549 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1550 {:ok, hide_follows} <- collection_private(following_data),
1551 {:ok, followers_data} <-
1552 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1553 {:ok, hide_followers} <- collection_private(followers_data) do
1556 hide_follows: hide_follows,
1557 follower_count: normalize_counter(followers_data["totalItems"]),
1558 following_count: normalize_counter(following_data["totalItems"]),
1559 hide_followers: hide_followers
1562 {:error, _} = e -> e
1567 defp normalize_counter(counter) when is_integer(counter), do: counter
1568 defp normalize_counter(_), do: 0
1570 def maybe_update_follow_information(user_data) do
1571 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1572 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1574 {:collections_available,
1575 !!(user_data[:following_address] && user_data[:follower_address])},
1577 fetch_follow_information_for_user(user_data) do
1578 info = Map.merge(user_data[:info] || %{}, info)
1581 |> Map.put(:info, info)
1583 {:user_type_check, false} ->
1586 {:collections_available, false} ->
1589 {:enabled, false} ->
1594 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1601 defp collection_private(%{"first" => %{"type" => type}})
1602 when type in ["CollectionPage", "OrderedCollectionPage"],
1605 defp collection_private(%{"first" => first}) do
1606 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1607 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1610 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1611 {:error, _} = e -> e
1616 defp collection_private(_data), do: {:ok, true}
1618 def user_data_from_user_object(data) do
1619 with {:ok, data} <- MRF.filter(data) do
1620 {:ok, object_to_user_data(data)}
1626 def fetch_and_prepare_user_from_ap_id(ap_id) do
1627 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1628 {:ok, data} <- user_data_from_user_object(data) do
1629 {:ok, maybe_update_follow_information(data)}
1631 # If this has been deleted, only log a debug and not an error
1632 {:error, "Object has been deleted" = e} ->
1633 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1636 {:error, {:reject, reason} = e} ->
1637 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1641 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1646 def maybe_handle_clashing_nickname(data) do
1647 with nickname when is_binary(nickname) <- data[:nickname],
1648 %User{} = old_user <- User.get_by_nickname(nickname),
1649 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1651 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1655 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1656 |> User.update_and_set_cache()
1658 {:ap_id_comparison, true} ->
1660 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1668 def pin_data_from_featured_collection(%{
1670 "orderedItems" => objects
1672 when type in ["OrderedCollection", "Collection"] do
1673 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1676 def fetch_and_prepare_featured_from_ap_id(nil) do
1680 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1681 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1682 {:ok, pin_data_from_featured_collection(data)}
1685 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1690 def pinned_fetch_task(nil), do: nil
1692 def pinned_fetch_task(%{pinned_objects: pins}) do
1693 if Enum.all?(pins, fn {ap_id, _} ->
1694 Object.get_cached_by_ap_id(ap_id) ||
1695 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1703 def make_user_from_ap_id(ap_id) do
1704 user = User.get_cached_by_ap_id(ap_id)
1706 if user && !User.ap_enabled?(user) do
1707 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1709 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1710 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1714 |> User.remote_user_changeset(data)
1715 |> User.update_and_set_cache()
1717 maybe_handle_clashing_nickname(data)
1720 |> User.remote_user_changeset()
1728 def make_user_from_nickname(nickname) do
1729 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1730 make_user_from_ap_id(ap_id)
1732 _e -> {:error, "No AP id in WebFinger"}
1736 # filter out broken threads
1737 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1738 entire_thread_visible_for_user?(activity, user)
1741 # do post-processing on a specific activity
1742 def contain_activity(%Activity{} = activity, %User{} = user) do
1743 contain_broken_threads(activity, user)
1746 def fetch_direct_messages_query do
1748 |> restrict_type(%{type: "Create"})
1749 |> restrict_visibility(%{visibility: "direct"})
1750 |> order_by([activity], asc: activity.id)