1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
8 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
16 alias Pleroma.Notification
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
36 require Pleroma.Constants
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
68 defp check_actor_can_insert(_), do: true
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
75 defp check_remote_limit(_), do: true
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
98 defp increase_replies_count_if_reply(_create_data), do: :noop
100 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
113 Repo.insert(%Activity{
116 recipients: recipients,
117 actor: object["actor"]
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
149 %Activity{} = activity ->
155 {:containment, _} = error ->
158 {:error, _} = error ->
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
166 recipients: recipients,
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
181 defp insert_activity_with_expiration(data, local, recipients) do
185 actor: data["actor"],
186 recipients: recipients
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
200 stream_out_participations(participations)
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
225 defp get_participations({:ok, conversation}) do
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
231 defp get_participations(_), do: []
233 def stream_out_participations(participations) do
236 |> Repo.preload(:user)
238 Streamer.stream("participation", participations)
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
259 def stream_out_participations(_, _), do: :noop
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
270 def stream_out(_activity) do
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
290 %{to: to, actor: actor, published: published, context: context, object: object},
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
305 {:quick_insert, true, activity} ->
308 {:fake, true, activity} ->
312 Repo.rollback(message)
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
390 {:error, error} -> Repo.rollback(error)
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id,
401 "to" => [origin.follower_address]
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_blockers_visibility(opts)
435 |> restrict_recipients(recipients, opts[:user])
436 |> restrict_filtered(opts)
440 "?->>'type' = ? and ?->>'context' = ?",
447 |> exclude_poll_votes(opts)
449 |> order_by([activity], desc: activity.id)
452 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
453 def fetch_activities_for_context(context, opts \\ %{}) do
455 |> fetch_activities_for_context_query(opts)
459 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
460 FlakeId.Ecto.CompatType.t() | nil
461 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
463 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
464 |> restrict_visibility(%{visibility: "direct"})
470 defp fetch_paginated_optimized(query, opts, pagination) do
471 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
472 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
473 opts = Map.put(opts, :skip_extra_order, true)
475 Pagination.fetch_paginated(query, opts, pagination)
478 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
479 list_memberships = Pleroma.List.memberships(opts[:user])
481 fetch_activities_query(recipients ++ list_memberships, opts)
482 |> fetch_paginated_optimized(opts, pagination)
484 |> maybe_update_cc(list_memberships, opts[:user])
487 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
488 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
489 opts = Map.delete(opts, :user)
491 [Constants.as_public()]
492 |> fetch_activities_query(opts)
493 |> restrict_unlisted(opts)
494 |> fetch_paginated_optimized(opts, pagination)
497 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
498 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
500 |> Map.put(:restrict_unlisted, true)
501 |> fetch_public_or_unlisted_activities(pagination)
504 @valid_visibilities ~w[direct unlisted public private]
506 defp restrict_visibility(query, %{visibility: visibility})
507 when is_list(visibility) do
508 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
513 "activity_visibility(?, ?, ?) = ANY (?)",
521 Logger.error("Could not restrict visibility to #{visibility}")
525 defp restrict_visibility(query, %{visibility: visibility})
526 when visibility in @valid_visibilities do
530 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
534 defp restrict_visibility(_query, %{visibility: visibility})
535 when visibility not in @valid_visibilities do
536 Logger.error("Could not restrict visibility to #{visibility}")
539 defp restrict_visibility(query, _visibility), do: query
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when is_list(visibility) do
543 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
548 "activity_visibility(?, ?, ?) = ANY (?)",
556 Logger.error("Could not exclude visibility to #{visibility}")
561 defp exclude_visibility(query, %{exclude_visibilities: visibility})
562 when visibility in @valid_visibilities do
567 "activity_visibility(?, ?, ?) = ?",
576 defp exclude_visibility(query, %{exclude_visibilities: visibility})
577 when visibility not in [nil | @valid_visibilities] do
578 Logger.error("Could not exclude visibility to #{visibility}")
582 defp exclude_visibility(query, _visibility), do: query
584 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
587 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
590 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
593 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
597 defp restrict_thread_visibility(query, _, _), do: query
599 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
602 |> Map.put(:user, reading_user)
603 |> Map.put(:actor_id, user.ap_id)
606 godmode: params[:godmode],
607 reading_user: reading_user
609 |> user_activities_recipients()
610 |> fetch_activities(params)
614 def fetch_user_activities(user, reading_user, params \\ %{})
616 def fetch_user_activities(user, reading_user, %{total: true} = params) do
617 result = fetch_activities_for_user(user, reading_user, params)
619 Keyword.put(result, :items, Enum.reverse(result[:items]))
622 def fetch_user_activities(user, reading_user, params) do
624 |> fetch_activities_for_user(reading_user, params)
628 defp fetch_activities_for_user(user, reading_user, params) do
631 |> Map.put(:type, ["Create", "Announce"])
632 |> Map.put(:user, reading_user)
633 |> Map.put(:actor_id, user.ap_id)
634 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
637 if User.blocks?(reading_user, user) do
641 |> Map.put(:blocking_user, reading_user)
642 |> Map.put(:muting_user, reading_user)
645 pagination_type = Map.get(params, :pagination_type) || :keyset
648 godmode: params[:godmode],
649 reading_user: reading_user
651 |> user_activities_recipients()
652 |> fetch_activities(params, pagination_type)
655 def fetch_statuses(reading_user, %{total: true} = params) do
656 result = fetch_activities_for_reading_user(reading_user, params)
657 Keyword.put(result, :items, Enum.reverse(result[:items]))
660 def fetch_statuses(reading_user, params) do
662 |> fetch_activities_for_reading_user(params)
666 defp fetch_activities_for_reading_user(reading_user, params) do
667 params = Map.put(params, :type, ["Create", "Announce"])
670 godmode: params[:godmode],
671 reading_user: reading_user
673 |> user_activities_recipients()
674 |> fetch_activities(params, :offset)
677 defp user_activities_recipients(%{godmode: true}), do: []
679 defp user_activities_recipients(%{reading_user: reading_user}) do
681 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
683 [Constants.as_public()]
687 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
688 raise "Can't use the child object without preloading!"
691 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
693 [activity, object] in query,
696 "?->>'type' != ? or ?->>'actor' != ?",
705 defp restrict_announce_object_actor(query, _), do: query
707 defp restrict_since(query, %{since_id: ""}), do: query
709 defp restrict_since(query, %{since_id: since_id}) do
710 from(activity in query, where: activity.id > ^since_id)
713 defp restrict_since(query, _), do: query
715 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
716 raise_on_missing_preload()
719 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
721 [_activity, object] in query,
722 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
726 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
727 restrict_embedded_tag_any(query, %{tag: tag})
730 defp restrict_embedded_tag_all(query, _), do: query
732 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
733 raise_on_missing_preload()
736 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
738 [_activity, object] in query,
739 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
743 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
744 restrict_embedded_tag_any(query, %{tag: [tag]})
747 defp restrict_embedded_tag_any(query, _), do: query
749 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
750 raise_on_missing_preload()
753 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
755 [_activity, object] in query,
756 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
760 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
761 when is_binary(tag_reject) do
762 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
765 defp restrict_embedded_tag_reject_any(query, _), do: query
767 defp object_ids_query_for_tags(tags) do
768 from(hto in "hashtags_objects")
769 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
770 |> where([hto, ht], ht.name in ^tags)
771 |> select([hto], hto.object_id)
772 |> distinct([hto], true)
775 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
779 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
780 restrict_hashtag_any(query, %{tag: single_tag})
783 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
785 [_activity, object] in query,
789 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
790 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
791 AND hashtags_objects.object_id = ?) @> ?
800 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
801 restrict_hashtag_all(query, %{tag_all: [tag]})
804 defp restrict_hashtag_all(query, _), do: query
806 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
807 raise_on_missing_preload()
810 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
812 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
815 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
817 [_activity, object] in query,
818 join: hto in "hashtags_objects",
819 on: hto.object_id == object.id,
820 where: hto.hashtag_id in ^hashtag_ids,
821 distinct: [desc: object.id],
822 order_by: [desc: object.id]
826 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
827 restrict_hashtag_any(query, %{tag: [tag]})
830 defp restrict_hashtag_any(query, _), do: query
832 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
833 raise_on_missing_preload()
836 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
838 [_activity, object] in query,
839 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
843 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
844 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
847 defp restrict_hashtag_reject_any(query, _), do: query
849 defp raise_on_missing_preload do
850 raise "Can't use the child object without preloading!"
853 defp restrict_recipients(query, [], _user), do: query
855 defp restrict_recipients(query, recipients, nil) do
856 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
859 defp restrict_recipients(query, recipients, user) do
862 where: fragment("? && ?", ^recipients, activity.recipients),
863 or_where: activity.actor == ^user.ap_id
867 defp restrict_local(query, %{local_only: true}) do
868 from(activity in query, where: activity.local == true)
871 defp restrict_local(query, _), do: query
873 defp restrict_remote(query, %{remote: true}) do
874 from(activity in query, where: activity.local == false)
877 defp restrict_remote(query, _), do: query
879 defp restrict_actor(query, %{actor_id: actor_id}) do
880 from(activity in query, where: activity.actor == ^actor_id)
883 defp restrict_actor(query, _), do: query
885 defp restrict_type(query, %{type: type}) when is_binary(type) do
886 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
889 defp restrict_type(query, %{type: type}) do
890 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
893 defp restrict_type(query, _), do: query
895 defp restrict_state(query, %{state: state}) do
896 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
899 defp restrict_state(query, _), do: query
901 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
903 [_activity, object] in query,
904 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
908 defp restrict_favorited_by(query, _), do: query
910 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
911 raise "Can't use the child object without preloading!"
914 defp restrict_media(query, %{only_media: true}) do
916 [activity, object] in query,
917 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
918 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
922 defp restrict_media(query, _), do: query
924 defp restrict_replies(query, %{exclude_replies: true}) do
926 [_activity, object] in query,
927 where: fragment("?->>'inReplyTo' is null", object.data)
931 defp restrict_replies(query, %{
932 reply_filtering_user: %User{} = user,
933 reply_visibility: "self"
936 [activity, object] in query,
939 "?->>'inReplyTo' is null OR ? = ANY(?)",
947 defp restrict_replies(query, %{
948 reply_filtering_user: %User{} = user,
949 reply_visibility: "following"
952 [activity, object] in query,
956 ?->>'type' != 'Create' -- This isn't a Create
957 OR ?->>'inReplyTo' is null -- this isn't a reply
958 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
959 -- unless they are the author (because authors
960 -- are also part of the recipients). This leads
961 -- to a bug that self-replies by friends won't
963 OR ? = ? -- The actor is us
967 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
976 defp restrict_replies(query, _), do: query
978 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
979 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
982 defp restrict_reblogs(query, _), do: query
984 defp restrict_muted(query, %{with_muted: true}), do: query
986 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
987 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
990 from([activity] in query,
991 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
994 "not (?->'to' \\?| ?) or ? = ?",
1002 unless opts[:skip_preload] do
1003 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1009 defp restrict_muted(query, _), do: query
1011 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1012 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1013 domain_blocks = user.domain_blocks || []
1015 following_ap_ids = User.get_friends_ap_ids(user)
1018 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1021 [activity, object: o] in query,
1022 # You don't block the author
1023 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1025 # You don't block any recipients, and didn't author the post
1028 "((not (? && ?)) or ? = ?)",
1029 activity.recipients,
1035 # You don't block the domain of any recipients, and didn't author the post
1038 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1039 activity.recipients,
1045 # It's not a boost of a user you block
1048 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1054 # You don't block the author's domain, and also don't follow the author
1057 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1064 # Same as above, but checks the Object
1067 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1076 defp restrict_blocked(query, _), do: query
1078 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1079 if Config.get([:activitypub, :blockers_visible]) == true do
1082 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1086 # The author doesn't block you
1087 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1089 # It's not a boost of a user that blocks you
1092 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1101 defp restrict_blockers_visibility(query, _), do: query
1103 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1108 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1110 ^[Constants.as_public()]
1115 defp restrict_unlisted(query, _), do: query
1117 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1119 [activity, object: o] in query,
1122 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1131 defp restrict_pinned(query, _), do: query
1133 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1134 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1140 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1148 defp restrict_muted_reblogs(query, _), do: query
1150 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1153 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1157 defp restrict_instance(query, _), do: query
1159 defp restrict_filtered(query, %{user: %User{} = user}) do
1160 case Filter.compose_regex(user) do
1165 from([activity, object] in query,
1167 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1168 activity.actor == ^user.ap_id
1173 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1174 restrict_filtered(query, %{user: user})
1177 defp restrict_filtered(query, _), do: query
1179 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1181 defp exclude_poll_votes(query, _) do
1182 if has_named_binding?(query, :object) do
1183 from([activity, object: o] in query,
1184 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1191 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1193 defp exclude_chat_messages(query, _) do
1194 if has_named_binding?(query, :object) do
1195 from([activity, object: o] in query,
1196 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1203 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1205 defp exclude_invisible_actors(query, _opts) do
1207 User.Query.build(%{invisible: true, select: [:ap_id]})
1209 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1211 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1214 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1215 from(activity in query, where: activity.id != ^id)
1218 defp exclude_id(query, _), do: query
1220 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1222 defp maybe_preload_objects(query, _) do
1224 |> Activity.with_preloaded_object()
1227 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1229 defp maybe_preload_bookmarks(query, opts) do
1231 |> Activity.with_preloaded_bookmark(opts[:user])
1234 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1236 |> Activity.with_preloaded_report_notes()
1239 defp maybe_preload_report_notes(query, _), do: query
1241 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1243 defp maybe_set_thread_muted_field(query, opts) do
1245 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1248 defp maybe_order(query, %{order: :desc}) do
1250 |> order_by(desc: :id)
1253 defp maybe_order(query, %{order: :asc}) do
1255 |> order_by(asc: :id)
1258 defp maybe_order(query, _), do: query
1260 defp normalize_fetch_activities_query_opts(opts) do
1261 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1263 value when is_bitstring(value) ->
1264 Map.put(opts, key, Hashtag.normalize_name(value))
1266 value when is_list(value) ->
1269 |> Enum.map(&Hashtag.normalize_name/1)
1272 Map.put(opts, key, normalized_value)
1280 defp fetch_activities_query_ap_ids_ops(opts) do
1281 source_user = opts[:muting_user]
1282 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1284 ap_id_relationships =
1285 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1286 [:block | ap_id_relationships]
1291 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1293 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1294 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1296 restrict_muted_reblogs_opts =
1297 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1299 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1302 def fetch_activities_query(recipients, opts \\ %{}) do
1303 opts = normalize_fetch_activities_query_opts(opts)
1305 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1306 fetch_activities_query_ap_ids_ops(opts)
1309 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1314 |> maybe_preload_objects(opts)
1315 |> maybe_preload_bookmarks(opts)
1316 |> maybe_preload_report_notes(opts)
1317 |> maybe_set_thread_muted_field(opts)
1318 |> maybe_order(opts)
1319 |> restrict_recipients(recipients, opts[:user])
1320 |> restrict_replies(opts)
1321 |> restrict_since(opts)
1322 |> restrict_local(opts)
1323 |> restrict_remote(opts)
1324 |> restrict_actor(opts)
1325 |> restrict_type(opts)
1326 |> restrict_state(opts)
1327 |> restrict_favorited_by(opts)
1328 |> restrict_blocked(restrict_blocked_opts)
1329 |> restrict_blockers_visibility(opts)
1330 |> restrict_muted(restrict_muted_opts)
1331 |> restrict_filtered(opts)
1332 |> restrict_media(opts)
1333 |> restrict_visibility(opts)
1334 |> restrict_thread_visibility(opts, config)
1335 |> restrict_reblogs(opts)
1336 |> restrict_pinned(opts)
1337 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1338 |> restrict_instance(opts)
1339 |> restrict_announce_object_actor(opts)
1340 |> restrict_filtered(opts)
1341 |> Activity.restrict_deactivated_users()
1342 |> exclude_poll_votes(opts)
1343 |> exclude_chat_messages(opts)
1344 |> exclude_invisible_actors(opts)
1345 |> exclude_visibility(opts)
1347 if Config.feature_enabled?(:improved_hashtag_timeline) do
1349 |> restrict_hashtag_any(opts)
1350 |> restrict_hashtag_all(opts)
1351 |> restrict_hashtag_reject_any(opts)
1354 |> restrict_embedded_tag_any(opts)
1355 |> restrict_embedded_tag_all(opts)
1356 |> restrict_embedded_tag_reject_any(opts)
1361 Fetch favorites activities of user with order by sort adds to favorites
1363 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1364 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1366 |> Activity.Queries.by_actor()
1367 |> Activity.Queries.by_type("Like")
1368 |> Activity.with_joined_object()
1369 |> Object.with_joined_activity()
1370 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1371 |> order_by([like, _, _], desc_nulls_last: like.id)
1372 |> Pagination.fetch_paginated(
1373 Map.merge(params, %{skip_order: true}),
1378 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1379 Enum.map(activities, fn
1380 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1381 if Enum.any?(bcc, &(&1 in list_memberships)) do
1382 update_in(activity.data["cc"], &[user_ap_id | &1])
1392 defp maybe_update_cc(activities, _, _), do: activities
1394 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1395 from(activity in query,
1397 fragment("? && ?", activity.recipients, ^recipients) or
1398 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1399 ^Constants.as_public() in activity.recipients)
1403 def fetch_activities_bounded(
1405 recipients_with_public,
1407 pagination \\ :keyset
1409 fetch_activities_query([], opts)
1410 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1411 |> Pagination.fetch_paginated(opts, pagination)
1415 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1416 def upload(file, opts \\ []) do
1417 with {:ok, data} <- Upload.store(file, opts) do
1418 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1420 Repo.insert(%Object{data: obj_data})
1424 @spec get_actor_url(any()) :: binary() | nil
1425 defp get_actor_url(url) when is_binary(url), do: url
1426 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1428 defp get_actor_url(url) when is_list(url) do
1434 defp get_actor_url(_url), do: nil
1436 defp normalize_image(%{"url" => url}) do
1439 "url" => [%{"href" => url}]
1443 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1444 defp normalize_image(_), do: nil
1446 defp object_to_user_data(data) do
1449 |> Map.get("attachment", [])
1450 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1451 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1455 |> Map.get("tag", [])
1457 %{"type" => "Emoji"} -> true
1460 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1461 {String.trim(name, ":"), url}
1464 is_locked = data["manuallyApprovesFollowers"] || false
1465 capabilities = data["capabilities"] || %{}
1466 accepts_chat_messages = capabilities["acceptsChatMessages"]
1467 data = Transmogrifier.maybe_fix_user_object(data)
1468 is_discoverable = data["discoverable"] || false
1469 invisible = data["invisible"] || false
1470 actor_type = data["type"] || "Person"
1472 featured_address = data["featured"]
1473 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1476 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1477 data["publicKey"]["publicKeyPem"]
1483 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1484 data["endpoints"]["sharedInbox"]
1491 uri: get_actor_url(data["url"]),
1493 banner: normalize_image(data["image"]),
1496 is_locked: is_locked,
1497 is_discoverable: is_discoverable,
1498 invisible: invisible,
1499 avatar: normalize_image(data["icon"]),
1501 follower_address: data["followers"],
1502 following_address: data["following"],
1503 featured_address: featured_address,
1504 bio: data["summary"] || "",
1505 actor_type: actor_type,
1506 also_known_as: Map.get(data, "alsoKnownAs", []),
1507 public_key: public_key,
1508 inbox: data["inbox"],
1509 shared_inbox: shared_inbox,
1510 accepts_chat_messages: accepts_chat_messages,
1511 pinned_objects: pinned_objects
1514 # nickname can be nil because of virtual actors
1515 if data["preferredUsername"] do
1519 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1522 Map.put(user_data, :nickname, nil)
1526 def fetch_follow_information_for_user(user) do
1527 with {:ok, following_data} <-
1528 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1529 {:ok, hide_follows} <- collection_private(following_data),
1530 {:ok, followers_data} <-
1531 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1532 {:ok, hide_followers} <- collection_private(followers_data) do
1535 hide_follows: hide_follows,
1536 follower_count: normalize_counter(followers_data["totalItems"]),
1537 following_count: normalize_counter(following_data["totalItems"]),
1538 hide_followers: hide_followers
1541 {:error, _} = e -> e
1546 defp normalize_counter(counter) when is_integer(counter), do: counter
1547 defp normalize_counter(_), do: 0
1549 def maybe_update_follow_information(user_data) do
1550 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1551 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1553 {:collections_available,
1554 !!(user_data[:following_address] && user_data[:follower_address])},
1556 fetch_follow_information_for_user(user_data) do
1557 info = Map.merge(user_data[:info] || %{}, info)
1560 |> Map.put(:info, info)
1562 {:user_type_check, false} ->
1565 {:collections_available, false} ->
1568 {:enabled, false} ->
1573 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1580 defp collection_private(%{"first" => %{"type" => type}})
1581 when type in ["CollectionPage", "OrderedCollectionPage"],
1584 defp collection_private(%{"first" => first}) do
1585 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1586 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1589 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1590 {:error, _} = e -> e
1595 defp collection_private(_data), do: {:ok, true}
1597 def user_data_from_user_object(data) do
1598 with {:ok, data} <- MRF.filter(data) do
1599 {:ok, object_to_user_data(data)}
1605 def fetch_and_prepare_user_from_ap_id(ap_id) do
1606 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1607 {:ok, data} <- user_data_from_user_object(data) do
1608 {:ok, maybe_update_follow_information(data)}
1610 # If this has been deleted, only log a debug and not an error
1611 {:error, "Object has been deleted" = e} ->
1612 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1615 {:error, {:reject, reason} = e} ->
1616 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1620 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1625 def maybe_handle_clashing_nickname(data) do
1626 with nickname when is_binary(nickname) <- data[:nickname],
1627 %User{} = old_user <- User.get_by_nickname(nickname),
1628 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1630 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1634 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1635 |> User.update_and_set_cache()
1637 {:ap_id_comparison, true} ->
1639 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1647 def pin_data_from_featured_collection(%{
1648 "type" => "OrderedCollection",
1651 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1653 |> Map.get("orderedItems")
1654 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1657 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1662 def pin_data_from_featured_collection(
1667 when type in ["OrderedCollection", "Collection"] do
1668 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1669 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1672 def fetch_and_prepare_featured_from_ap_id(nil) do
1676 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1677 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1678 {:ok, pin_data_from_featured_collection(data)}
1681 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1686 def pinned_fetch_task(nil), do: nil
1688 def pinned_fetch_task(%{pinned_objects: pins}) do
1689 if Enum.all?(pins, fn {ap_id, _} ->
1690 Object.get_cached_by_ap_id(ap_id) ||
1691 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1699 def make_user_from_ap_id(ap_id) do
1700 user = User.get_cached_by_ap_id(ap_id)
1702 if user && !User.ap_enabled?(user) do
1703 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1705 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1706 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1710 |> User.remote_user_changeset(data)
1711 |> User.update_and_set_cache()
1713 maybe_handle_clashing_nickname(data)
1716 |> User.remote_user_changeset()
1724 def make_user_from_nickname(nickname) do
1725 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1726 make_user_from_ap_id(ap_id)
1728 _e -> {:error, "No AP id in WebFinger"}
1732 # filter out broken threads
1733 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1734 entire_thread_visible_for_user?(activity, user)
1737 # do post-processing on a specific activity
1738 def contain_activity(%Activity{} = activity, %User{} = user) do
1739 contain_broken_threads(activity, user)
1742 def fetch_direct_messages_query do
1744 |> restrict_type(%{type: "Create"})
1745 |> restrict_visibility(%{visibility: "direct"})
1746 |> order_by([activity], asc: activity.id)