1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
35 require Pleroma.Constants
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
67 defp check_actor_can_insert(_), do: true
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
74 defp check_remote_limit(_), do: true
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
84 def update_last_status_at_if_public(actor, object) do
85 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
88 defp increase_replies_count_if_reply(%{
89 "object" => %{"inReplyTo" => reply_ap_id} = object,
92 if is_public?(object) do
93 Object.increase_replies_count(reply_ap_id)
97 defp increase_replies_count_if_reply(_create_data), do: :noop
99 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
101 def persist(%{"type" => type} = object, meta) when type in @object_types do
102 with {:ok, object} <- Object.create(object) do
108 def persist(object, meta) do
109 with local <- Keyword.fetch!(meta, :local),
110 {recipients, _, _} <- get_recipients(object),
112 Repo.insert(%Activity{
115 recipients: recipients,
116 actor: object["actor"]
118 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
119 {:ok, _} <- maybe_create_activity_expiration(activity) do
120 {:ok, activity, meta}
124 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
125 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
129 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
134 {:ok, map, object} <- insert_full_object(map),
135 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
136 # Splice in the child object if we have one.
137 activity = Maps.put_if_present(activity, :object, object)
139 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
140 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
143 # Add local posts to search index
144 if local, do: Pleroma.Search.add_to_index(activity)
148 %Activity{} = activity ->
154 {:containment, _} = error ->
157 {:error, _} = error ->
160 {:fake, true, map, recipients} ->
161 activity = %Activity{
165 recipients: recipients,
169 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
172 {:remote_limit_pass, _} ->
173 {:error, :remote_limit}
180 defp insert_activity_with_expiration(data, local, recipients) do
184 actor: data["actor"],
185 recipients: recipients
188 with {:ok, activity} <- Repo.insert(struct) do
189 maybe_create_activity_expiration(activity)
193 def notify_and_stream(activity) do
194 Notification.create_notifications(activity)
196 conversation = create_or_bump_conversation(activity, activity.actor)
197 participations = get_participations(conversation)
199 stream_out_participations(participations)
202 defp maybe_create_activity_expiration(
203 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
206 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
207 activity_id: activity.id,
208 expires_at: expires_at
214 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
216 defp create_or_bump_conversation(activity, actor) do
217 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
218 %User{} = user <- User.get_cached_by_ap_id(actor) do
219 Participation.mark_as_read(user, conversation)
224 defp get_participations({:ok, conversation}) do
226 |> Repo.preload(:participations, force: true)
227 |> Map.get(:participations)
230 defp get_participations(_), do: []
232 def stream_out_participations(participations) do
235 |> Repo.preload(:user)
237 Streamer.stream("participation", participations)
241 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
242 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
243 conversation = Repo.preload(conversation, :participations)
246 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
251 if last_activity_id do
252 stream_out_participations(conversation.participations)
258 def stream_out_participations(_, _), do: :noop
261 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
262 when data_type in ["Create", "Announce", "Delete"] do
264 |> Topics.get_activity_topics()
265 |> Streamer.stream(activity)
269 def stream_out(_activity) do
273 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
274 def create(params, fake \\ false) do
275 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
280 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
285 quick_insert? = Config.get([:env]) == :benchmark
289 %{to: to, actor: actor, published: published, context: context, object: object},
293 with {:ok, activity} <- insert(create_data, local, fake),
294 {:fake, false, activity} <- {:fake, fake, activity},
295 _ <- increase_replies_count_if_reply(create_data),
296 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
297 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
298 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
299 _ <- notify_and_stream(activity),
300 :ok <- maybe_schedule_poll_notifications(activity),
301 :ok <- maybe_federate(activity) do
304 {:quick_insert, true, activity} ->
307 {:fake, true, activity} ->
311 Repo.rollback(message)
315 defp maybe_schedule_poll_notifications(activity) do
316 PollWorker.schedule_poll_end(activity)
320 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
321 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
322 additional = params[:additional] || %{}
323 # only accept false as false value
324 local = !(params[:local] == false)
325 published = params[:published]
329 %{to: to, actor: actor, published: published, context: context, object: object},
333 with {:ok, activity} <- insert(listen_data, local),
334 _ <- notify_and_stream(activity),
335 :ok <- maybe_federate(activity) do
340 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
341 {:ok, Activity.t()} | nil | {:error, any()}
342 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
343 with {:ok, result} <-
344 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
349 defp do_unfollow(follower, followed, activity_id, local) do
350 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
351 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
352 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
353 {:ok, activity} <- insert(unfollow_data, local),
354 _ <- notify_and_stream(activity),
355 :ok <- maybe_federate(activity) do
359 {:error, error} -> Repo.rollback(error)
363 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
365 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
379 # only accept false as false value
380 local = !(params[:local] == false)
381 forward = !(params[:forward] == false)
383 additional = params[:additional] || %{}
387 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
389 Map.merge(additional, %{"to" => [], "cc" => []})
392 with flag_data <- make_flag_data(params, additional),
393 {:ok, activity} <- insert(flag_data, local),
394 {:ok, stripped_activity} <- strip_report_status_data(activity),
395 _ <- notify_and_stream(activity),
397 maybe_federate(stripped_activity) do
398 User.all_superusers()
399 |> Enum.filter(fn user -> user.ap_id != actor end)
400 |> Enum.filter(fn user -> not is_nil(user.email) end)
401 |> Enum.each(fn superuser ->
403 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
404 |> Pleroma.Emails.Mailer.deliver_async()
409 {:error, error} -> Repo.rollback(error)
413 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
414 def move(%User{} = origin, %User{} = target, local \\ true) do
417 "actor" => origin.ap_id,
418 "object" => origin.ap_id,
419 "target" => target.ap_id
422 with true <- origin.ap_id in target.also_known_as,
423 {:ok, activity} <- insert(params, local),
424 _ <- notify_and_stream(activity) do
425 maybe_federate(activity)
427 BackgroundWorker.enqueue("move_following", %{
428 "origin_id" => origin.id,
429 "target_id" => target.id
434 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
439 def fetch_activities_for_context_query(context, opts) do
440 public = [Constants.as_public()]
444 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
447 from(activity in Activity)
448 |> maybe_preload_objects(opts)
449 |> maybe_preload_bookmarks(opts)
450 |> maybe_set_thread_muted_field(opts)
451 |> restrict_blocked(opts)
452 |> restrict_blockers_visibility(opts)
453 |> restrict_recipients(recipients, opts[:user])
454 |> restrict_filtered(opts)
458 "?->>'type' = ? and ?->>'context' = ?",
465 |> exclude_poll_votes(opts)
467 |> order_by([activity], desc: activity.id)
470 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
471 def fetch_activities_for_context(context, opts \\ %{}) do
473 |> fetch_activities_for_context_query(opts)
477 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
478 FlakeId.Ecto.CompatType.t() | nil
479 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
481 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
482 |> restrict_visibility(%{visibility: "direct"})
488 defp fetch_paginated_optimized(query, opts, pagination) do
489 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
490 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
491 opts = Map.put(opts, :skip_extra_order, true)
493 Pagination.fetch_paginated(query, opts, pagination)
496 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
497 list_memberships = Pleroma.List.memberships(opts[:user])
499 fetch_activities_query(recipients ++ list_memberships, opts)
500 |> fetch_paginated_optimized(opts, pagination)
502 |> maybe_update_cc(list_memberships, opts[:user])
505 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
506 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
507 opts = Map.delete(opts, :user)
509 [Constants.as_public()]
510 |> fetch_activities_query(opts)
511 |> restrict_unlisted(opts)
512 |> fetch_paginated_optimized(opts, pagination)
515 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
516 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
518 |> Map.put(:restrict_unlisted, true)
519 |> fetch_public_or_unlisted_activities(pagination)
522 @valid_visibilities ~w[direct unlisted public private]
524 defp restrict_visibility(query, %{visibility: visibility})
525 when is_list(visibility) do
526 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
531 "activity_visibility(?, ?, ?) = ANY (?)",
539 Logger.error("Could not restrict visibility to #{visibility}")
543 defp restrict_visibility(query, %{visibility: visibility})
544 when visibility in @valid_visibilities do
548 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
552 defp restrict_visibility(_query, %{visibility: visibility})
553 when visibility not in @valid_visibilities do
554 Logger.error("Could not restrict visibility to #{visibility}")
557 defp restrict_visibility(query, _visibility), do: query
559 defp exclude_visibility(query, %{exclude_visibilities: visibility})
560 when is_list(visibility) do
561 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
566 "activity_visibility(?, ?, ?) = ANY (?)",
574 Logger.error("Could not exclude visibility to #{visibility}")
579 defp exclude_visibility(query, %{exclude_visibilities: visibility})
580 when visibility in @valid_visibilities do
585 "activity_visibility(?, ?, ?) = ?",
594 defp exclude_visibility(query, %{exclude_visibilities: visibility})
595 when visibility not in [nil | @valid_visibilities] do
596 Logger.error("Could not exclude visibility to #{visibility}")
600 defp exclude_visibility(query, _visibility), do: query
602 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
605 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
608 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
611 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
615 defp restrict_thread_visibility(query, _, _), do: query
617 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
620 |> Map.put(:user, reading_user)
621 |> Map.put(:actor_id, user.ap_id)
624 godmode: params[:godmode],
625 reading_user: reading_user
627 |> user_activities_recipients()
628 |> fetch_activities(params)
632 def fetch_user_activities(user, reading_user, params \\ %{})
634 def fetch_user_activities(user, reading_user, %{total: true} = params) do
635 result = fetch_activities_for_user(user, reading_user, params)
637 Keyword.put(result, :items, Enum.reverse(result[:items]))
640 def fetch_user_activities(user, reading_user, params) do
642 |> fetch_activities_for_user(reading_user, params)
646 defp fetch_activities_for_user(user, reading_user, params) do
649 |> Map.put(:type, ["Create", "Announce"])
650 |> Map.put(:user, reading_user)
651 |> Map.put(:actor_id, user.ap_id)
652 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
655 if User.blocks?(reading_user, user) do
659 |> Map.put(:blocking_user, reading_user)
660 |> Map.put(:muting_user, reading_user)
663 pagination_type = Map.get(params, :pagination_type) || :keyset
666 godmode: params[:godmode],
667 reading_user: reading_user
669 |> user_activities_recipients()
670 |> fetch_activities(params, pagination_type)
673 def fetch_statuses(reading_user, %{total: true} = params) do
674 result = fetch_activities_for_reading_user(reading_user, params)
675 Keyword.put(result, :items, Enum.reverse(result[:items]))
678 def fetch_statuses(reading_user, params) do
680 |> fetch_activities_for_reading_user(params)
684 defp fetch_activities_for_reading_user(reading_user, params) do
685 params = Map.put(params, :type, ["Create", "Announce"])
688 godmode: params[:godmode],
689 reading_user: reading_user
691 |> user_activities_recipients()
692 |> fetch_activities(params, :offset)
695 defp user_activities_recipients(%{godmode: true}), do: []
697 defp user_activities_recipients(%{reading_user: reading_user}) do
699 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
701 [Constants.as_public()]
705 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
706 raise "Can't use the child object without preloading!"
709 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
711 [activity, object] in query,
714 "?->>'type' != ? or ?->>'actor' != ?",
723 defp restrict_announce_object_actor(query, _), do: query
725 defp restrict_since(query, %{since_id: ""}), do: query
727 defp restrict_since(query, %{since_id: since_id}) do
728 from(activity in query, where: activity.id > ^since_id)
731 defp restrict_since(query, _), do: query
733 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
734 raise_on_missing_preload()
737 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
739 [_activity, object] in query,
740 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
744 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
745 restrict_embedded_tag_any(query, %{tag: tag})
748 defp restrict_embedded_tag_all(query, _), do: query
750 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
751 raise_on_missing_preload()
754 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
756 [_activity, object] in query,
757 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
761 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
762 restrict_embedded_tag_any(query, %{tag: [tag]})
765 defp restrict_embedded_tag_any(query, _), do: query
767 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
768 raise_on_missing_preload()
771 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
773 [_activity, object] in query,
774 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
778 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
779 when is_binary(tag_reject) do
780 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
783 defp restrict_embedded_tag_reject_any(query, _), do: query
785 defp object_ids_query_for_tags(tags) do
786 from(hto in "hashtags_objects")
787 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
788 |> where([hto, ht], ht.name in ^tags)
789 |> select([hto], hto.object_id)
790 |> distinct([hto], true)
793 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
794 raise_on_missing_preload()
797 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
798 restrict_hashtag_any(query, %{tag: single_tag})
801 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
803 [_activity, object] in query,
807 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
808 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
809 AND hashtags_objects.object_id = ?) @> ?
818 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
819 restrict_hashtag_all(query, %{tag_all: [tag]})
822 defp restrict_hashtag_all(query, _), do: query
824 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
825 raise_on_missing_preload()
828 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
830 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
833 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
835 [_activity, object] in query,
836 join: hto in "hashtags_objects",
837 on: hto.object_id == object.id,
838 where: hto.hashtag_id in ^hashtag_ids,
839 distinct: [desc: object.id],
840 order_by: [desc: object.id]
844 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
845 restrict_hashtag_any(query, %{tag: [tag]})
848 defp restrict_hashtag_any(query, _), do: query
850 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
851 raise_on_missing_preload()
854 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
856 [_activity, object] in query,
857 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
861 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
862 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
865 defp restrict_hashtag_reject_any(query, _), do: query
867 defp raise_on_missing_preload do
868 raise "Can't use the child object without preloading!"
871 defp restrict_recipients(query, [], _user), do: query
873 defp restrict_recipients(query, recipients, nil) do
874 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
877 defp restrict_recipients(query, recipients, user) do
880 where: fragment("? && ?", ^recipients, activity.recipients),
881 or_where: activity.actor == ^user.ap_id
885 defp restrict_local(query, %{local_only: true}) do
886 from(activity in query, where: activity.local == true)
889 defp restrict_local(query, _), do: query
891 defp restrict_remote(query, %{remote: true}) do
892 from(activity in query, where: activity.local == false)
895 defp restrict_remote(query, _), do: query
897 defp restrict_actor(query, %{actor_id: actor_id}) do
898 from(activity in query, where: activity.actor == ^actor_id)
901 defp restrict_actor(query, _), do: query
903 defp restrict_type(query, %{type: type}) when is_binary(type) do
904 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
907 defp restrict_type(query, %{type: type}) do
908 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
911 defp restrict_type(query, _), do: query
913 defp restrict_state(query, %{state: state}) do
914 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
917 defp restrict_state(query, _), do: query
919 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
921 [_activity, object] in query,
922 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
926 defp restrict_favorited_by(query, _), do: query
928 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
929 raise "Can't use the child object without preloading!"
932 defp restrict_media(query, %{only_media: true}) do
934 [activity, object] in query,
935 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
936 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
940 defp restrict_media(query, _), do: query
942 defp restrict_replies(query, %{exclude_replies: true}) do
944 [_activity, object] in query,
945 where: fragment("?->>'inReplyTo' is null", object.data)
949 defp restrict_replies(query, %{
950 reply_filtering_user: %User{} = user,
951 reply_visibility: "self"
954 [activity, object] in query,
957 "?->>'inReplyTo' is null OR ? = ANY(?)",
965 defp restrict_replies(query, %{
966 reply_filtering_user: %User{} = user,
967 reply_visibility: "following"
970 [activity, object] in query,
974 ?->>'type' != 'Create' -- This isn't a Create
975 OR ?->>'inReplyTo' is null -- this isn't a reply
976 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
977 -- unless they are the author (because authors
978 -- are also part of the recipients). This leads
979 -- to a bug that self-replies by friends won't
981 OR ? = ? -- The actor is us
985 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
994 defp restrict_replies(query, _), do: query
996 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
997 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1000 defp restrict_reblogs(query, _), do: query
1002 defp restrict_muted(query, %{with_muted: true}), do: query
1004 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1005 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1008 from([activity] in query,
1009 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1012 "not (?->'to' \\?| ?) or ? = ?",
1020 unless opts[:skip_preload] do
1021 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1027 defp restrict_muted(query, _), do: query
1029 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1030 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1031 domain_blocks = user.domain_blocks || []
1033 following_ap_ids = User.get_friends_ap_ids(user)
1036 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1039 [activity, object: o] in query,
1040 # You don't block the author
1041 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1043 # You don't block any recipients, and didn't author the post
1046 "((not (? && ?)) or ? = ?)",
1047 activity.recipients,
1053 # You don't block the domain of any recipients, and didn't author the post
1056 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1057 activity.recipients,
1063 # It's not a boost of a user you block
1066 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1072 # You don't block the author's domain, and also don't follow the author
1075 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1082 # Same as above, but checks the Object
1085 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1094 defp restrict_blocked(query, _), do: query
1096 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1097 if Config.get([:activitypub, :blockers_visible]) == true do
1100 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1104 # The author doesn't block you
1105 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1107 # It's not a boost of a user that blocks you
1110 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1119 defp restrict_blockers_visibility(query, _), do: query
1121 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1126 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1128 ^[Constants.as_public()]
1133 defp restrict_unlisted(query, _), do: query
1135 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1137 [activity, object: o] in query,
1140 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1149 defp restrict_pinned(query, _), do: query
1151 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1152 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1158 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1166 defp restrict_muted_reblogs(query, _), do: query
1168 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1171 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1175 defp restrict_instance(query, _), do: query
1177 defp restrict_filtered(query, %{user: %User{} = user}) do
1178 case Filter.compose_regex(user) do
1183 from([activity, object] in query,
1185 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1186 activity.actor == ^user.ap_id
1191 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1192 restrict_filtered(query, %{user: user})
1195 defp restrict_filtered(query, _), do: query
1197 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1199 defp exclude_poll_votes(query, _) do
1200 if has_named_binding?(query, :object) do
1201 from([activity, object: o] in query,
1202 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1209 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1211 defp exclude_chat_messages(query, _) do
1212 if has_named_binding?(query, :object) do
1213 from([activity, object: o] in query,
1214 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1221 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1223 defp exclude_invisible_actors(query, _opts) do
1225 User.Query.build(%{invisible: true, select: [:ap_id]})
1227 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1229 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1232 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1233 from(activity in query, where: activity.id != ^id)
1236 defp exclude_id(query, _), do: query
1238 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1240 defp maybe_preload_objects(query, _) do
1242 |> Activity.with_preloaded_object()
1245 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1247 defp maybe_preload_bookmarks(query, opts) do
1249 |> Activity.with_preloaded_bookmark(opts[:user])
1252 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1254 |> Activity.with_preloaded_report_notes()
1257 defp maybe_preload_report_notes(query, _), do: query
1259 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1261 defp maybe_set_thread_muted_field(query, opts) do
1263 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1266 defp maybe_order(query, %{order: :desc}) do
1268 |> order_by(desc: :id)
1271 defp maybe_order(query, %{order: :asc}) do
1273 |> order_by(asc: :id)
1276 defp maybe_order(query, _), do: query
1278 defp normalize_fetch_activities_query_opts(opts) do
1279 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1281 value when is_bitstring(value) ->
1282 Map.put(opts, key, Hashtag.normalize_name(value))
1284 value when is_list(value) ->
1287 |> Enum.map(&Hashtag.normalize_name/1)
1290 Map.put(opts, key, normalized_value)
1298 defp fetch_activities_query_ap_ids_ops(opts) do
1299 source_user = opts[:muting_user]
1300 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1302 ap_id_relationships =
1303 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1304 [:block | ap_id_relationships]
1309 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1311 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1312 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1314 restrict_muted_reblogs_opts =
1315 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1317 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1320 def fetch_activities_query(recipients, opts \\ %{}) do
1321 opts = normalize_fetch_activities_query_opts(opts)
1323 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1324 fetch_activities_query_ap_ids_ops(opts)
1327 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1332 |> maybe_preload_objects(opts)
1333 |> maybe_preload_bookmarks(opts)
1334 |> maybe_preload_report_notes(opts)
1335 |> maybe_set_thread_muted_field(opts)
1336 |> maybe_order(opts)
1337 |> restrict_recipients(recipients, opts[:user])
1338 |> restrict_replies(opts)
1339 |> restrict_since(opts)
1340 |> restrict_local(opts)
1341 |> restrict_remote(opts)
1342 |> restrict_actor(opts)
1343 |> restrict_type(opts)
1344 |> restrict_state(opts)
1345 |> restrict_favorited_by(opts)
1346 |> restrict_blocked(restrict_blocked_opts)
1347 |> restrict_blockers_visibility(opts)
1348 |> restrict_muted(restrict_muted_opts)
1349 |> restrict_filtered(opts)
1350 |> restrict_media(opts)
1351 |> restrict_visibility(opts)
1352 |> restrict_thread_visibility(opts, config)
1353 |> restrict_reblogs(opts)
1354 |> restrict_pinned(opts)
1355 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1356 |> restrict_instance(opts)
1357 |> restrict_announce_object_actor(opts)
1358 |> restrict_filtered(opts)
1359 |> Activity.restrict_deactivated_users()
1360 |> exclude_poll_votes(opts)
1361 |> exclude_chat_messages(opts)
1362 |> exclude_invisible_actors(opts)
1363 |> exclude_visibility(opts)
1365 if Config.feature_enabled?(:improved_hashtag_timeline) do
1367 |> restrict_hashtag_any(opts)
1368 |> restrict_hashtag_all(opts)
1369 |> restrict_hashtag_reject_any(opts)
1372 |> restrict_embedded_tag_any(opts)
1373 |> restrict_embedded_tag_all(opts)
1374 |> restrict_embedded_tag_reject_any(opts)
1379 Fetch favorites activities of user with order by sort adds to favorites
1381 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1382 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1384 |> Activity.Queries.by_actor()
1385 |> Activity.Queries.by_type("Like")
1386 |> Activity.with_joined_object()
1387 |> Object.with_joined_activity()
1388 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1389 |> order_by([like, _, _], desc_nulls_last: like.id)
1390 |> Pagination.fetch_paginated(
1391 Map.merge(params, %{skip_order: true}),
1396 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1397 Enum.map(activities, fn
1398 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1399 if Enum.any?(bcc, &(&1 in list_memberships)) do
1400 update_in(activity.data["cc"], &[user_ap_id | &1])
1410 defp maybe_update_cc(activities, _, _), do: activities
1412 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1413 from(activity in query,
1415 fragment("? && ?", activity.recipients, ^recipients) or
1416 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1417 ^Constants.as_public() in activity.recipients)
1421 def fetch_activities_bounded(
1423 recipients_with_public,
1425 pagination \\ :keyset
1427 fetch_activities_query([], opts)
1428 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1429 |> Pagination.fetch_paginated(opts, pagination)
1433 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1434 def upload(file, opts \\ []) do
1435 with {:ok, data} <- Upload.store(file, opts) do
1436 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1438 Repo.insert(%Object{data: obj_data})
1442 @spec get_actor_url(any()) :: binary() | nil
1443 defp get_actor_url(url) when is_binary(url), do: url
1444 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1446 defp get_actor_url(url) when is_list(url) do
1452 defp get_actor_url(_url), do: nil
1454 defp normalize_image(%{"url" => url}) do
1457 "url" => [%{"href" => url}]
1461 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1462 defp normalize_image(_), do: nil
1464 defp object_to_user_data(data) do
1467 |> Map.get("attachment", [])
1468 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1469 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1473 |> Map.get("tag", [])
1475 %{"type" => "Emoji"} -> true
1478 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1479 {String.trim(name, ":"), url}
1482 is_locked = data["manuallyApprovesFollowers"] || false
1483 capabilities = data["capabilities"] || %{}
1484 accepts_chat_messages = capabilities["acceptsChatMessages"]
1485 data = Transmogrifier.maybe_fix_user_object(data)
1486 is_discoverable = data["discoverable"] || false
1487 invisible = data["invisible"] || false
1488 actor_type = data["type"] || "Person"
1490 featured_address = data["featured"]
1491 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1494 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1495 data["publicKey"]["publicKeyPem"]
1501 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1502 data["endpoints"]["sharedInbox"]
1509 uri: get_actor_url(data["url"]),
1511 banner: normalize_image(data["image"]),
1514 is_locked: is_locked,
1515 is_discoverable: is_discoverable,
1516 invisible: invisible,
1517 avatar: normalize_image(data["icon"]),
1519 follower_address: data["followers"],
1520 following_address: data["following"],
1521 featured_address: featured_address,
1522 bio: data["summary"] || "",
1523 actor_type: actor_type,
1524 also_known_as: Map.get(data, "alsoKnownAs", []),
1525 public_key: public_key,
1526 inbox: data["inbox"],
1527 shared_inbox: shared_inbox,
1528 accepts_chat_messages: accepts_chat_messages,
1529 pinned_objects: pinned_objects
1532 # nickname can be nil because of virtual actors
1533 if data["preferredUsername"] do
1537 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1540 Map.put(user_data, :nickname, nil)
1544 def fetch_follow_information_for_user(user) do
1545 with {:ok, following_data} <-
1546 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1547 {:ok, hide_follows} <- collection_private(following_data),
1548 {:ok, followers_data} <-
1549 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1550 {:ok, hide_followers} <- collection_private(followers_data) do
1553 hide_follows: hide_follows,
1554 follower_count: normalize_counter(followers_data["totalItems"]),
1555 following_count: normalize_counter(following_data["totalItems"]),
1556 hide_followers: hide_followers
1559 {:error, _} = e -> e
1564 defp normalize_counter(counter) when is_integer(counter), do: counter
1565 defp normalize_counter(_), do: 0
1567 def maybe_update_follow_information(user_data) do
1568 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1569 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1571 {:collections_available,
1572 !!(user_data[:following_address] && user_data[:follower_address])},
1574 fetch_follow_information_for_user(user_data) do
1575 info = Map.merge(user_data[:info] || %{}, info)
1578 |> Map.put(:info, info)
1580 {:user_type_check, false} ->
1583 {:collections_available, false} ->
1586 {:enabled, false} ->
1591 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1598 defp collection_private(%{"first" => %{"type" => type}})
1599 when type in ["CollectionPage", "OrderedCollectionPage"],
1602 defp collection_private(%{"first" => first}) do
1603 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1604 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1607 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1608 {:error, _} = e -> e
1613 defp collection_private(_data), do: {:ok, true}
1615 def user_data_from_user_object(data) do
1616 with {:ok, data} <- MRF.filter(data) do
1617 {:ok, object_to_user_data(data)}
1623 def fetch_and_prepare_user_from_ap_id(ap_id) do
1624 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1625 {:ok, data} <- user_data_from_user_object(data) do
1626 {:ok, maybe_update_follow_information(data)}
1628 # If this has been deleted, only log a debug and not an error
1629 {:error, "Object has been deleted" = e} ->
1630 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1633 {:error, {:reject, reason} = e} ->
1634 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1638 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1643 def maybe_handle_clashing_nickname(data) do
1644 with nickname when is_binary(nickname) <- data[:nickname],
1645 %User{} = old_user <- User.get_by_nickname(nickname),
1646 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1648 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1652 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1653 |> User.update_and_set_cache()
1655 {:ap_id_comparison, true} ->
1657 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1665 def pin_data_from_featured_collection(%{
1667 "orderedItems" => objects
1669 when type in ["OrderedCollection", "Collection"] do
1670 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1673 def fetch_and_prepare_featured_from_ap_id(nil) do
1677 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1678 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1679 {:ok, pin_data_from_featured_collection(data)}
1682 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1687 def pinned_fetch_task(nil), do: nil
1689 def pinned_fetch_task(%{pinned_objects: pins}) do
1690 if Enum.all?(pins, fn {ap_id, _} ->
1691 Object.get_cached_by_ap_id(ap_id) ||
1692 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1700 def make_user_from_ap_id(ap_id) do
1701 user = User.get_cached_by_ap_id(ap_id)
1703 if user && !User.ap_enabled?(user) do
1704 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1706 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1707 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1711 |> User.remote_user_changeset(data)
1712 |> User.update_and_set_cache()
1714 maybe_handle_clashing_nickname(data)
1717 |> User.remote_user_changeset()
1725 def make_user_from_nickname(nickname) do
1726 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1727 make_user_from_ap_id(ap_id)
1729 _e -> {:error, "No AP id in WebFinger"}
1733 # filter out broken threads
1734 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1735 entire_thread_visible_for_user?(activity, user)
1738 # do post-processing on a specific activity
1739 def contain_activity(%Activity{} = activity, %User{} = user) do
1740 contain_broken_threads(activity, user)
1743 def fetch_direct_messages_query do
1745 |> restrict_type(%{type: "Create"})
1746 |> restrict_visibility(%{visibility: "direct"})
1747 |> order_by([activity], asc: activity.id)