1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
35 require Pleroma.Constants
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
67 defp check_actor_can_insert(_), do: true
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
74 defp check_remote_limit(_), do: true
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
84 defp increase_replies_count_if_reply(%{
85 "object" => %{"inReplyTo" => reply_ap_id} = object,
88 if is_public?(object) do
89 Object.increase_replies_count(reply_ap_id)
93 defp increase_replies_count_if_reply(_create_data), do: :noop
95 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
97 def persist(%{"type" => type} = object, meta) when type in @object_types do
98 with {:ok, object} <- Object.create(object) do
104 def persist(object, meta) do
105 with local <- Keyword.fetch!(meta, :local),
106 {recipients, _, _} <- get_recipients(object),
108 Repo.insert(%Activity{
111 recipients: recipients,
112 actor: object["actor"]
114 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
115 {:ok, _} <- maybe_create_activity_expiration(activity) do
116 {:ok, activity, meta}
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
125 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map),
131 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
132 # Splice in the child object if we have one.
133 activity = Maps.put_if_present(activity, :object, object)
135 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
136 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
141 %Activity{} = activity ->
147 {:containment, _} = error ->
150 {:error, _} = error ->
153 {:fake, true, map, recipients} ->
154 activity = %Activity{
158 recipients: recipients,
162 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
165 {:remote_limit_pass, _} ->
166 {:error, :remote_limit}
173 defp insert_activity_with_expiration(data, local, recipients) do
177 actor: data["actor"],
178 recipients: recipients
181 with {:ok, activity} <- Repo.insert(struct) do
182 maybe_create_activity_expiration(activity)
186 def notify_and_stream(activity) do
187 Notification.create_notifications(activity)
189 conversation = create_or_bump_conversation(activity, activity.actor)
190 participations = get_participations(conversation)
192 stream_out_participations(participations)
195 defp maybe_create_activity_expiration(
196 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
199 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
200 activity_id: activity.id,
201 expires_at: expires_at
207 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
209 defp create_or_bump_conversation(activity, actor) do
210 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
211 %User{} = user <- User.get_cached_by_ap_id(actor) do
212 Participation.mark_as_read(user, conversation)
217 defp get_participations({:ok, conversation}) do
219 |> Repo.preload(:participations, force: true)
220 |> Map.get(:participations)
223 defp get_participations(_), do: []
225 def stream_out_participations(participations) do
228 |> Repo.preload(:user)
230 Streamer.stream("participation", participations)
234 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
235 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
236 conversation = Repo.preload(conversation, :participations)
239 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
244 if last_activity_id do
245 stream_out_participations(conversation.participations)
251 def stream_out_participations(_, _), do: :noop
254 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
255 when data_type in ["Create", "Announce", "Delete"] do
257 |> Topics.get_activity_topics()
258 |> Streamer.stream(activity)
262 def stream_out(_activity) do
266 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
267 def create(params, fake \\ false) do
268 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
273 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
274 additional = params[:additional] || %{}
275 # only accept false as false value
276 local = !(params[:local] == false)
277 published = params[:published]
278 quick_insert? = Config.get([:env]) == :benchmark
282 %{to: to, actor: actor, published: published, context: context, object: object},
286 with {:ok, activity} <- insert(create_data, local, fake),
287 {:fake, false, activity} <- {:fake, fake, activity},
288 _ <- increase_replies_count_if_reply(create_data),
289 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
290 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
291 _ <- notify_and_stream(activity),
292 :ok <- maybe_schedule_poll_notifications(activity),
293 :ok <- maybe_federate(activity) do
296 {:quick_insert, true, activity} ->
299 {:fake, true, activity} ->
303 Repo.rollback(message)
307 defp maybe_schedule_poll_notifications(activity) do
308 PollWorker.schedule_poll_end(activity)
312 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
313 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
314 additional = params[:additional] || %{}
315 # only accept false as false value
316 local = !(params[:local] == false)
317 published = params[:published]
321 %{to: to, actor: actor, published: published, context: context, object: object},
325 with {:ok, activity} <- insert(listen_data, local),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_federate(activity) do
332 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
333 {:ok, Activity.t()} | nil | {:error, any()}
334 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
335 with {:ok, result} <-
336 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
341 defp do_unfollow(follower, followed, activity_id, local) do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
344 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
345 {:ok, activity} <- insert(unfollow_data, local),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
351 {:error, error} -> Repo.rollback(error)
355 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
357 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
371 # only accept false as false value
372 local = !(params[:local] == false)
373 forward = !(params[:forward] == false)
375 additional = params[:additional] || %{}
379 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
381 Map.merge(additional, %{"to" => [], "cc" => []})
384 with flag_data <- make_flag_data(params, additional),
385 {:ok, activity} <- insert(flag_data, local),
386 {:ok, stripped_activity} <- strip_report_status_data(activity),
387 _ <- notify_and_stream(activity),
389 maybe_federate(stripped_activity) do
390 User.all_superusers()
391 |> Enum.filter(fn user -> user.ap_id != actor end)
392 |> Enum.filter(fn user -> not is_nil(user.email) end)
393 |> Enum.each(fn superuser ->
395 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
396 |> Pleroma.Emails.Mailer.deliver_async()
401 {:error, error} -> Repo.rollback(error)
405 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
406 def move(%User{} = origin, %User{} = target, local \\ true) do
409 "actor" => origin.ap_id,
410 "object" => origin.ap_id,
411 "target" => target.ap_id
414 with true <- origin.ap_id in target.also_known_as,
415 {:ok, activity} <- insert(params, local),
416 _ <- notify_and_stream(activity) do
417 maybe_federate(activity)
419 BackgroundWorker.enqueue("move_following", %{
420 "origin_id" => origin.id,
421 "target_id" => target.id
426 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
431 def fetch_activities_for_context_query(context, opts) do
432 public = [Constants.as_public()]
436 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
439 from(activity in Activity)
440 |> maybe_preload_objects(opts)
441 |> maybe_preload_bookmarks(opts)
442 |> maybe_set_thread_muted_field(opts)
443 |> restrict_blocked(opts)
444 |> restrict_recipients(recipients, opts[:user])
445 |> restrict_filtered(opts)
449 "?->>'type' = ? and ?->>'context' = ?",
456 |> exclude_poll_votes(opts)
458 |> order_by([activity], desc: activity.id)
461 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
462 def fetch_activities_for_context(context, opts \\ %{}) do
464 |> fetch_activities_for_context_query(opts)
468 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
469 FlakeId.Ecto.CompatType.t() | nil
470 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
472 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
473 |> restrict_visibility(%{visibility: "direct"})
479 defp fetch_paginated_optimized(query, opts, pagination) do
480 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
481 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
482 opts = Map.put(opts, :skip_extra_order, true)
484 Pagination.fetch_paginated(query, opts, pagination)
487 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
488 list_memberships = Pleroma.List.memberships(opts[:user])
490 fetch_activities_query(recipients ++ list_memberships, opts)
491 |> fetch_paginated_optimized(opts, pagination)
493 |> maybe_update_cc(list_memberships, opts[:user])
496 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
497 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
498 opts = Map.delete(opts, :user)
500 [Constants.as_public()]
501 |> fetch_activities_query(opts)
502 |> restrict_unlisted(opts)
503 |> fetch_paginated_optimized(opts, pagination)
506 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
507 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
509 |> Map.put(:restrict_unlisted, true)
510 |> fetch_public_or_unlisted_activities(pagination)
513 @valid_visibilities ~w[direct unlisted public private]
515 defp restrict_visibility(query, %{visibility: visibility})
516 when is_list(visibility) do
517 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
522 "activity_visibility(?, ?, ?) = ANY (?)",
530 Logger.error("Could not restrict visibility to #{visibility}")
534 defp restrict_visibility(query, %{visibility: visibility})
535 when visibility in @valid_visibilities do
539 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
543 defp restrict_visibility(_query, %{visibility: visibility})
544 when visibility not in @valid_visibilities do
545 Logger.error("Could not restrict visibility to #{visibility}")
548 defp restrict_visibility(query, _visibility), do: query
550 defp exclude_visibility(query, %{exclude_visibilities: visibility})
551 when is_list(visibility) do
552 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
557 "activity_visibility(?, ?, ?) = ANY (?)",
565 Logger.error("Could not exclude visibility to #{visibility}")
570 defp exclude_visibility(query, %{exclude_visibilities: visibility})
571 when visibility in @valid_visibilities do
576 "activity_visibility(?, ?, ?) = ?",
585 defp exclude_visibility(query, %{exclude_visibilities: visibility})
586 when visibility not in [nil | @valid_visibilities] do
587 Logger.error("Could not exclude visibility to #{visibility}")
591 defp exclude_visibility(query, _visibility), do: query
593 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
596 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
599 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
602 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
606 defp restrict_thread_visibility(query, _, _), do: query
608 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
611 |> Map.put(:user, reading_user)
612 |> Map.put(:actor_id, user.ap_id)
615 godmode: params[:godmode],
616 reading_user: reading_user
618 |> user_activities_recipients()
619 |> fetch_activities(params)
623 def fetch_user_activities(user, reading_user, params \\ %{})
625 def fetch_user_activities(user, reading_user, %{total: true} = params) do
626 result = fetch_activities_for_user(user, reading_user, params)
628 Keyword.put(result, :items, Enum.reverse(result[:items]))
631 def fetch_user_activities(user, reading_user, params) do
633 |> fetch_activities_for_user(reading_user, params)
637 defp fetch_activities_for_user(user, reading_user, params) do
640 |> Map.put(:type, ["Create", "Announce"])
641 |> Map.put(:user, reading_user)
642 |> Map.put(:actor_id, user.ap_id)
643 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
646 if User.blocks?(reading_user, user) do
650 |> Map.put(:blocking_user, reading_user)
651 |> Map.put(:muting_user, reading_user)
654 pagination_type = Map.get(params, :pagination_type) || :keyset
657 godmode: params[:godmode],
658 reading_user: reading_user
660 |> user_activities_recipients()
661 |> fetch_activities(params, pagination_type)
664 def fetch_statuses(reading_user, %{total: true} = params) do
665 result = fetch_activities_for_reading_user(reading_user, params)
666 Keyword.put(result, :items, Enum.reverse(result[:items]))
669 def fetch_statuses(reading_user, params) do
671 |> fetch_activities_for_reading_user(params)
675 defp fetch_activities_for_reading_user(reading_user, params) do
676 params = Map.put(params, :type, ["Create", "Announce"])
679 godmode: params[:godmode],
680 reading_user: reading_user
682 |> user_activities_recipients()
683 |> fetch_activities(params, :offset)
686 defp user_activities_recipients(%{godmode: true}), do: []
688 defp user_activities_recipients(%{reading_user: reading_user}) do
690 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
692 [Constants.as_public()]
696 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
697 raise "Can't use the child object without preloading!"
700 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
702 [activity, object] in query,
705 "?->>'type' != ? or ?->>'actor' != ?",
714 defp restrict_announce_object_actor(query, _), do: query
716 defp restrict_since(query, %{since_id: ""}), do: query
718 defp restrict_since(query, %{since_id: since_id}) do
719 from(activity in query, where: activity.id > ^since_id)
722 defp restrict_since(query, _), do: query
724 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
725 raise_on_missing_preload()
728 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
730 [_activity, object] in query,
731 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
735 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
736 restrict_embedded_tag_any(query, %{tag: tag})
739 defp restrict_embedded_tag_all(query, _), do: query
741 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
742 raise_on_missing_preload()
745 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
747 [_activity, object] in query,
748 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
752 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
753 restrict_embedded_tag_any(query, %{tag: [tag]})
756 defp restrict_embedded_tag_any(query, _), do: query
758 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
759 raise_on_missing_preload()
762 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
764 [_activity, object] in query,
765 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
769 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
770 when is_binary(tag_reject) do
771 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
774 defp restrict_embedded_tag_reject_any(query, _), do: query
776 defp object_ids_query_for_tags(tags) do
777 from(hto in "hashtags_objects")
778 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
779 |> where([hto, ht], ht.name in ^tags)
780 |> select([hto], hto.object_id)
781 |> distinct([hto], true)
784 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
785 raise_on_missing_preload()
788 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
789 restrict_hashtag_any(query, %{tag: single_tag})
792 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
794 [_activity, object] in query,
798 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
799 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
800 AND hashtags_objects.object_id = ?) @> ?
809 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
810 restrict_hashtag_all(query, %{tag_all: [tag]})
813 defp restrict_hashtag_all(query, _), do: query
815 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
816 raise_on_missing_preload()
819 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
821 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
824 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
826 [_activity, object] in query,
827 join: hto in "hashtags_objects",
828 on: hto.object_id == object.id,
829 where: hto.hashtag_id in ^hashtag_ids,
830 distinct: [desc: object.id],
831 order_by: [desc: object.id]
835 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
836 restrict_hashtag_any(query, %{tag: [tag]})
839 defp restrict_hashtag_any(query, _), do: query
841 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
842 raise_on_missing_preload()
845 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
847 [_activity, object] in query,
848 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
852 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
853 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
856 defp restrict_hashtag_reject_any(query, _), do: query
858 defp raise_on_missing_preload do
859 raise "Can't use the child object without preloading!"
862 defp restrict_recipients(query, [], _user), do: query
864 defp restrict_recipients(query, recipients, nil) do
865 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
868 defp restrict_recipients(query, recipients, user) do
871 where: fragment("? && ?", ^recipients, activity.recipients),
872 or_where: activity.actor == ^user.ap_id
876 defp restrict_local(query, %{local_only: true}) do
877 from(activity in query, where: activity.local == true)
880 defp restrict_local(query, _), do: query
882 defp restrict_remote(query, %{remote: true}) do
883 from(activity in query, where: activity.local == false)
886 defp restrict_remote(query, _), do: query
888 defp restrict_actor(query, %{actor_id: actor_id}) do
889 from(activity in query, where: activity.actor == ^actor_id)
892 defp restrict_actor(query, _), do: query
894 defp restrict_type(query, %{type: type}) when is_binary(type) do
895 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
898 defp restrict_type(query, %{type: type}) do
899 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
902 defp restrict_type(query, _), do: query
904 defp restrict_state(query, %{state: state}) do
905 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
908 defp restrict_state(query, _), do: query
910 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
912 [_activity, object] in query,
913 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
917 defp restrict_favorited_by(query, _), do: query
919 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
920 raise "Can't use the child object without preloading!"
923 defp restrict_media(query, %{only_media: true}) do
925 [activity, object] in query,
926 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
927 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
931 defp restrict_media(query, _), do: query
933 defp restrict_replies(query, %{exclude_replies: true}) do
935 [_activity, object] in query,
936 where: fragment("?->>'inReplyTo' is null", object.data)
940 defp restrict_replies(query, %{
941 reply_filtering_user: %User{} = user,
942 reply_visibility: "self"
945 [activity, object] in query,
948 "?->>'inReplyTo' is null OR ? = ANY(?)",
956 defp restrict_replies(query, %{
957 reply_filtering_user: %User{} = user,
958 reply_visibility: "following"
961 [activity, object] in query,
965 ?->>'type' != 'Create' -- This isn't a Create
966 OR ?->>'inReplyTo' is null -- this isn't a reply
967 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
968 -- unless they are the author (because authors
969 -- are also part of the recipients). This leads
970 -- to a bug that self-replies by friends won't
972 OR ? = ? -- The actor is us
976 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
985 defp restrict_replies(query, _), do: query
987 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
988 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
991 defp restrict_reblogs(query, _), do: query
993 defp restrict_muted(query, %{with_muted: true}), do: query
995 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
996 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
999 from([activity] in query,
1000 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1003 "not (?->'to' \\?| ?) or ? = ?",
1011 unless opts[:skip_preload] do
1012 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1018 defp restrict_muted(query, _), do: query
1020 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1021 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1022 domain_blocks = user.domain_blocks || []
1024 following_ap_ids = User.get_friends_ap_ids(user)
1027 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1030 [activity, object: o] in query,
1031 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1034 "((not (? && ?)) or ? = ?)",
1035 activity.recipients,
1042 "recipients_contain_blocked_domains(?, ?) = false",
1043 activity.recipients,
1048 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1055 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1063 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1072 defp restrict_blocked(query, _), do: query
1074 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1079 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1081 ^[Constants.as_public()]
1086 defp restrict_unlisted(query, _), do: query
1088 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1090 [activity, object: o] in query,
1093 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1102 defp restrict_pinned(query, _), do: query
1104 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1105 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1111 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1119 defp restrict_muted_reblogs(query, _), do: query
1121 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1124 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1128 defp restrict_instance(query, _), do: query
1130 defp restrict_filtered(query, %{user: %User{} = user}) do
1131 case Filter.compose_regex(user) do
1136 from([activity, object] in query,
1138 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1139 activity.actor == ^user.ap_id
1144 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1145 restrict_filtered(query, %{user: user})
1148 defp restrict_filtered(query, _), do: query
1150 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1152 defp exclude_poll_votes(query, _) do
1153 if has_named_binding?(query, :object) do
1154 from([activity, object: o] in query,
1155 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1162 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1164 defp exclude_chat_messages(query, _) do
1165 if has_named_binding?(query, :object) do
1166 from([activity, object: o] in query,
1167 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1174 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1176 defp exclude_invisible_actors(query, _opts) do
1178 User.Query.build(%{invisible: true, select: [:ap_id]})
1180 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1182 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1185 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1186 from(activity in query, where: activity.id != ^id)
1189 defp exclude_id(query, _), do: query
1191 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1193 defp maybe_preload_objects(query, _) do
1195 |> Activity.with_preloaded_object()
1198 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1200 defp maybe_preload_bookmarks(query, opts) do
1202 |> Activity.with_preloaded_bookmark(opts[:user])
1205 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1207 |> Activity.with_preloaded_report_notes()
1210 defp maybe_preload_report_notes(query, _), do: query
1212 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1214 defp maybe_set_thread_muted_field(query, opts) do
1216 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1219 defp maybe_order(query, %{order: :desc}) do
1221 |> order_by(desc: :id)
1224 defp maybe_order(query, %{order: :asc}) do
1226 |> order_by(asc: :id)
1229 defp maybe_order(query, _), do: query
1231 defp normalize_fetch_activities_query_opts(opts) do
1232 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1234 value when is_bitstring(value) ->
1235 Map.put(opts, key, Hashtag.normalize_name(value))
1237 value when is_list(value) ->
1240 |> Enum.map(&Hashtag.normalize_name/1)
1243 Map.put(opts, key, normalized_value)
1251 defp fetch_activities_query_ap_ids_ops(opts) do
1252 source_user = opts[:muting_user]
1253 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1255 ap_id_relationships =
1256 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1257 [:block | ap_id_relationships]
1262 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1264 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1265 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1267 restrict_muted_reblogs_opts =
1268 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1270 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1273 def fetch_activities_query(recipients, opts \\ %{}) do
1274 opts = normalize_fetch_activities_query_opts(opts)
1276 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1277 fetch_activities_query_ap_ids_ops(opts)
1280 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1285 |> maybe_preload_objects(opts)
1286 |> maybe_preload_bookmarks(opts)
1287 |> maybe_preload_report_notes(opts)
1288 |> maybe_set_thread_muted_field(opts)
1289 |> maybe_order(opts)
1290 |> restrict_recipients(recipients, opts[:user])
1291 |> restrict_replies(opts)
1292 |> restrict_since(opts)
1293 |> restrict_local(opts)
1294 |> restrict_remote(opts)
1295 |> restrict_actor(opts)
1296 |> restrict_type(opts)
1297 |> restrict_state(opts)
1298 |> restrict_favorited_by(opts)
1299 |> restrict_blocked(restrict_blocked_opts)
1300 |> restrict_muted(restrict_muted_opts)
1301 |> restrict_filtered(opts)
1302 |> restrict_media(opts)
1303 |> restrict_visibility(opts)
1304 |> restrict_thread_visibility(opts, config)
1305 |> restrict_reblogs(opts)
1306 |> restrict_pinned(opts)
1307 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1308 |> restrict_instance(opts)
1309 |> restrict_announce_object_actor(opts)
1310 |> restrict_filtered(opts)
1311 |> Activity.restrict_deactivated_users()
1312 |> exclude_poll_votes(opts)
1313 |> exclude_chat_messages(opts)
1314 |> exclude_invisible_actors(opts)
1315 |> exclude_visibility(opts)
1317 if Config.feature_enabled?(:improved_hashtag_timeline) do
1319 |> restrict_hashtag_any(opts)
1320 |> restrict_hashtag_all(opts)
1321 |> restrict_hashtag_reject_any(opts)
1324 |> restrict_embedded_tag_any(opts)
1325 |> restrict_embedded_tag_all(opts)
1326 |> restrict_embedded_tag_reject_any(opts)
1331 Fetch favorites activities of user with order by sort adds to favorites
1333 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1334 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1336 |> Activity.Queries.by_actor()
1337 |> Activity.Queries.by_type("Like")
1338 |> Activity.with_joined_object()
1339 |> Object.with_joined_activity()
1340 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1341 |> order_by([like, _, _], desc_nulls_last: like.id)
1342 |> Pagination.fetch_paginated(
1343 Map.merge(params, %{skip_order: true}),
1348 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1349 Enum.map(activities, fn
1350 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1351 if Enum.any?(bcc, &(&1 in list_memberships)) do
1352 update_in(activity.data["cc"], &[user_ap_id | &1])
1362 defp maybe_update_cc(activities, _, _), do: activities
1364 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1365 from(activity in query,
1367 fragment("? && ?", activity.recipients, ^recipients) or
1368 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1369 ^Constants.as_public() in activity.recipients)
1373 def fetch_activities_bounded(
1375 recipients_with_public,
1377 pagination \\ :keyset
1379 fetch_activities_query([], opts)
1380 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1381 |> Pagination.fetch_paginated(opts, pagination)
1385 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1386 def upload(file, opts \\ []) do
1387 with {:ok, data} <- Upload.store(file, opts) do
1388 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1390 Repo.insert(%Object{data: obj_data})
1394 @spec get_actor_url(any()) :: binary() | nil
1395 defp get_actor_url(url) when is_binary(url), do: url
1396 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1398 defp get_actor_url(url) when is_list(url) do
1404 defp get_actor_url(_url), do: nil
1406 defp normalize_image(%{"url" => url}) do
1409 "url" => [%{"href" => url}]
1413 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1414 defp normalize_image(_), do: nil
1416 defp object_to_user_data(data) do
1419 |> Map.get("attachment", [])
1420 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1421 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1425 |> Map.get("tag", [])
1427 %{"type" => "Emoji"} -> true
1430 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1431 {String.trim(name, ":"), url}
1434 is_locked = data["manuallyApprovesFollowers"] || false
1435 capabilities = data["capabilities"] || %{}
1436 accepts_chat_messages = capabilities["acceptsChatMessages"]
1437 data = Transmogrifier.maybe_fix_user_object(data)
1438 is_discoverable = data["discoverable"] || false
1439 invisible = data["invisible"] || false
1440 actor_type = data["type"] || "Person"
1442 featured_address = data["featured"]
1443 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1446 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1447 data["publicKey"]["publicKeyPem"]
1453 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1454 data["endpoints"]["sharedInbox"]
1461 uri: get_actor_url(data["url"]),
1463 banner: normalize_image(data["image"]),
1466 is_locked: is_locked,
1467 is_discoverable: is_discoverable,
1468 invisible: invisible,
1469 avatar: normalize_image(data["icon"]),
1471 follower_address: data["followers"],
1472 following_address: data["following"],
1473 featured_address: featured_address,
1474 bio: data["summary"] || "",
1475 actor_type: actor_type,
1476 also_known_as: Map.get(data, "alsoKnownAs", []),
1477 public_key: public_key,
1478 inbox: data["inbox"],
1479 shared_inbox: shared_inbox,
1480 accepts_chat_messages: accepts_chat_messages,
1481 pinned_objects: pinned_objects
1484 # nickname can be nil because of virtual actors
1485 if data["preferredUsername"] do
1489 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1492 Map.put(user_data, :nickname, nil)
1496 def fetch_follow_information_for_user(user) do
1497 with {:ok, following_data} <-
1498 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1499 {:ok, hide_follows} <- collection_private(following_data),
1500 {:ok, followers_data} <-
1501 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1502 {:ok, hide_followers} <- collection_private(followers_data) do
1505 hide_follows: hide_follows,
1506 follower_count: normalize_counter(followers_data["totalItems"]),
1507 following_count: normalize_counter(following_data["totalItems"]),
1508 hide_followers: hide_followers
1511 {:error, _} = e -> e
1516 defp normalize_counter(counter) when is_integer(counter), do: counter
1517 defp normalize_counter(_), do: 0
1519 def maybe_update_follow_information(user_data) do
1520 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1521 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1523 {:collections_available,
1524 !!(user_data[:following_address] && user_data[:follower_address])},
1526 fetch_follow_information_for_user(user_data) do
1527 info = Map.merge(user_data[:info] || %{}, info)
1530 |> Map.put(:info, info)
1532 {:user_type_check, false} ->
1535 {:collections_available, false} ->
1538 {:enabled, false} ->
1543 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1550 defp collection_private(%{"first" => %{"type" => type}})
1551 when type in ["CollectionPage", "OrderedCollectionPage"],
1554 defp collection_private(%{"first" => first}) do
1555 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1556 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1559 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1560 {:error, _} = e -> e
1565 defp collection_private(_data), do: {:ok, true}
1567 def user_data_from_user_object(data) do
1568 with {:ok, data} <- MRF.filter(data) do
1569 {:ok, object_to_user_data(data)}
1575 def fetch_and_prepare_user_from_ap_id(ap_id) do
1576 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1577 {:ok, data} <- user_data_from_user_object(data) do
1578 {:ok, maybe_update_follow_information(data)}
1580 # If this has been deleted, only log a debug and not an error
1581 {:error, "Object has been deleted" = e} ->
1582 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1585 {:error, {:reject, reason} = e} ->
1586 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1590 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1595 def maybe_handle_clashing_nickname(data) do
1596 with nickname when is_binary(nickname) <- data[:nickname],
1597 %User{} = old_user <- User.get_by_nickname(nickname),
1598 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1600 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1606 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1607 |> User.update_and_set_cache()
1609 {:ap_id_comparison, true} ->
1611 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1619 def pin_data_from_featured_collection(%{
1621 "orderedItems" => objects
1623 when type in ["OrderedCollection", "Collection"] do
1624 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1627 def fetch_and_prepare_featured_from_ap_id(nil) do
1631 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1632 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1633 {:ok, pin_data_from_featured_collection(data)}
1636 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1641 def pinned_fetch_task(nil), do: nil
1643 def pinned_fetch_task(%{pinned_objects: pins}) do
1644 if Enum.all?(pins, fn {ap_id, _} ->
1645 Object.get_cached_by_ap_id(ap_id) ||
1646 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1654 def make_user_from_ap_id(ap_id) do
1655 user = User.get_cached_by_ap_id(ap_id)
1657 if user && !User.ap_enabled?(user) do
1658 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1660 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1661 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1665 |> User.remote_user_changeset(data)
1666 |> User.update_and_set_cache()
1668 maybe_handle_clashing_nickname(data)
1671 |> User.remote_user_changeset()
1679 def make_user_from_nickname(nickname) do
1680 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1681 make_user_from_ap_id(ap_id)
1683 _e -> {:error, "No AP id in WebFinger"}
1687 # filter out broken threads
1688 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1689 entire_thread_visible_for_user?(activity, user)
1692 # do post-processing on a specific activity
1693 def contain_activity(%Activity{} = activity, %User{} = user) do
1694 contain_broken_threads(activity, user)
1697 def fetch_direct_messages_query do
1699 |> restrict_type(%{type: "Create"})
1700 |> restrict_visibility(%{visibility: "direct"})
1701 |> order_by([activity], asc: activity.id)