1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
34 require Pleroma.Constants
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
56 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
57 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
60 case User.get_cached_by_ap_id(actor) do
61 %User{is_active: true} -> true
66 defp check_actor_can_insert(_), do: true
68 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
69 limit = Config.get([:instance, :remote_limit])
70 String.length(content) <= limit
73 defp check_remote_limit(_), do: true
75 def increase_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 def decrease_note_count_if_public(actor, object) do
80 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 defp increase_replies_count_if_reply(%{
84 "object" => %{"inReplyTo" => reply_ap_id} = object,
87 if is_public?(object) do
88 Object.increase_replies_count(reply_ap_id)
92 defp increase_replies_count_if_reply(_create_data), do: :noop
94 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
96 def persist(%{"type" => type} = object, meta) when type in @object_types do
97 with {:ok, object} <- Object.create(object) do
103 def persist(object, meta) do
104 with local <- Keyword.fetch!(meta, :local),
105 {recipients, _, _} <- get_recipients(object),
107 Repo.insert(%Activity{
110 recipients: recipients,
111 actor: object["actor"]
113 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
114 {:ok, _} <- maybe_create_activity_expiration(activity) do
115 {:ok, activity, meta}
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
124 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map),
130 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
131 # Splice in the child object if we have one.
132 activity = Maps.put_if_present(activity, :object, object)
134 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
135 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
140 %Activity{} = activity ->
146 {:containment, _} = error ->
149 {:error, _} = error ->
152 {:fake, true, map, recipients} ->
153 activity = %Activity{
157 recipients: recipients,
161 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
164 {:remote_limit_pass, _} ->
165 {:error, :remote_limit}
172 defp insert_activity_with_expiration(data, local, recipients) do
176 actor: data["actor"],
177 recipients: recipients
180 with {:ok, activity} <- Repo.insert(struct) do
181 maybe_create_activity_expiration(activity)
185 def notify_and_stream(activity) do
186 Notification.create_notifications(activity)
188 conversation = create_or_bump_conversation(activity, activity.actor)
189 participations = get_participations(conversation)
191 stream_out_participations(participations)
194 defp maybe_create_activity_expiration(
195 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
198 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
199 activity_id: activity.id,
200 expires_at: expires_at
206 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
208 defp create_or_bump_conversation(activity, actor) do
209 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
210 %User{} = user <- User.get_cached_by_ap_id(actor) do
211 Participation.mark_as_read(user, conversation)
216 defp get_participations({:ok, conversation}) do
218 |> Repo.preload(:participations, force: true)
219 |> Map.get(:participations)
222 defp get_participations(_), do: []
224 def stream_out_participations(participations) do
227 |> Repo.preload(:user)
229 Streamer.stream("participation", participations)
233 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
234 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
235 conversation = Repo.preload(conversation, :participations)
238 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
243 if last_activity_id do
244 stream_out_participations(conversation.participations)
250 def stream_out_participations(_, _), do: :noop
253 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
254 when data_type in ["Create", "Announce", "Delete"] do
256 |> Topics.get_activity_topics()
257 |> Streamer.stream(activity)
261 def stream_out(_activity) do
265 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
266 def create(params, fake \\ false) do
267 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
272 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
273 additional = params[:additional] || %{}
274 # only accept false as false value
275 local = !(params[:local] == false)
276 published = params[:published]
277 quick_insert? = Config.get([:env]) == :benchmark
281 %{to: to, actor: actor, published: published, context: context, object: object},
285 with {:ok, activity} <- insert(create_data, local, fake),
286 {:fake, false, activity} <- {:fake, fake, activity},
287 _ <- increase_replies_count_if_reply(create_data),
288 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
289 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
290 _ <- notify_and_stream(activity),
291 :ok <- maybe_federate(activity) do
294 {:quick_insert, true, activity} ->
297 {:fake, true, activity} ->
301 Repo.rollback(message)
305 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
306 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
307 additional = params[:additional] || %{}
308 # only accept false as false value
309 local = !(params[:local] == false)
310 published = params[:published]
314 %{to: to, actor: actor, published: published, context: context, object: object},
318 with {:ok, activity} <- insert(listen_data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
325 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
326 {:ok, Activity.t()} | nil | {:error, any()}
327 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
334 defp do_unfollow(follower, followed, activity_id, local) do
335 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
336 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
337 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
338 {:ok, activity} <- insert(unfollow_data, local),
339 _ <- notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
344 {:error, error} -> Repo.rollback(error)
348 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
350 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
364 # only accept false as false value
365 local = !(params[:local] == false)
366 forward = !(params[:forward] == false)
368 additional = params[:additional] || %{}
372 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
374 Map.merge(additional, %{"to" => [], "cc" => []})
377 with flag_data <- make_flag_data(params, additional),
378 {:ok, activity} <- insert(flag_data, local),
379 {:ok, stripped_activity} <- strip_report_status_data(activity),
380 _ <- notify_and_stream(activity),
382 maybe_federate(stripped_activity) do
383 User.all_superusers()
384 |> Enum.filter(fn user -> user.ap_id != actor end)
385 |> Enum.filter(fn user -> not is_nil(user.email) end)
386 |> Enum.each(fn superuser ->
388 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
389 |> Pleroma.Emails.Mailer.deliver_async()
394 {:error, error} -> Repo.rollback(error)
398 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
399 def move(%User{} = origin, %User{} = target, local \\ true) do
402 "actor" => origin.ap_id,
403 "object" => origin.ap_id,
404 "target" => target.ap_id
407 with true <- origin.ap_id in target.also_known_as,
408 {:ok, activity} <- insert(params, local),
409 _ <- notify_and_stream(activity) do
410 maybe_federate(activity)
412 BackgroundWorker.enqueue("move_following", %{
413 "origin_id" => origin.id,
414 "target_id" => target.id
419 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
424 def fetch_activities_for_context_query(context, opts) do
425 public = [Constants.as_public()]
429 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
432 from(activity in Activity)
433 |> maybe_preload_objects(opts)
434 |> maybe_preload_bookmarks(opts)
435 |> maybe_set_thread_muted_field(opts)
436 |> restrict_blocked(opts)
437 |> restrict_recipients(recipients, opts[:user])
438 |> restrict_filtered(opts)
442 "?->>'type' = ? and ?->>'context' = ?",
449 |> exclude_poll_votes(opts)
451 |> order_by([activity], desc: activity.id)
454 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
455 def fetch_activities_for_context(context, opts \\ %{}) do
457 |> fetch_activities_for_context_query(opts)
461 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
462 FlakeId.Ecto.CompatType.t() | nil
463 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
465 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
466 |> restrict_visibility(%{visibility: "direct"})
472 defp fetch_paginated_optimized(query, opts, pagination) do
473 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
474 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
475 opts = Map.put(opts, :skip_extra_order, true)
477 Pagination.fetch_paginated(query, opts, pagination)
480 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
481 list_memberships = Pleroma.List.memberships(opts[:user])
483 fetch_activities_query(recipients ++ list_memberships, opts)
484 |> fetch_paginated_optimized(opts, pagination)
486 |> maybe_update_cc(list_memberships, opts[:user])
489 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
490 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
491 opts = Map.delete(opts, :user)
493 [Constants.as_public()]
494 |> fetch_activities_query(opts)
495 |> restrict_unlisted(opts)
496 |> fetch_paginated_optimized(opts, pagination)
499 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
500 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
502 |> Map.put(:restrict_unlisted, true)
503 |> fetch_public_or_unlisted_activities(pagination)
506 @valid_visibilities ~w[direct unlisted public private]
508 defp restrict_visibility(query, %{visibility: visibility})
509 when is_list(visibility) do
510 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
515 "activity_visibility(?, ?, ?) = ANY (?)",
523 Logger.error("Could not restrict visibility to #{visibility}")
527 defp restrict_visibility(query, %{visibility: visibility})
528 when visibility in @valid_visibilities do
532 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
536 defp restrict_visibility(_query, %{visibility: visibility})
537 when visibility not in @valid_visibilities do
538 Logger.error("Could not restrict visibility to #{visibility}")
541 defp restrict_visibility(query, _visibility), do: query
543 defp exclude_visibility(query, %{exclude_visibilities: visibility})
544 when is_list(visibility) do
545 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
550 "activity_visibility(?, ?, ?) = ANY (?)",
558 Logger.error("Could not exclude visibility to #{visibility}")
563 defp exclude_visibility(query, %{exclude_visibilities: visibility})
564 when visibility in @valid_visibilities do
569 "activity_visibility(?, ?, ?) = ?",
578 defp exclude_visibility(query, %{exclude_visibilities: visibility})
579 when visibility not in [nil | @valid_visibilities] do
580 Logger.error("Could not exclude visibility to #{visibility}")
584 defp exclude_visibility(query, _visibility), do: query
586 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
589 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
592 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
595 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
599 defp restrict_thread_visibility(query, _, _), do: query
601 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
604 |> Map.put(:user, reading_user)
605 |> Map.put(:actor_id, user.ap_id)
608 godmode: params[:godmode],
609 reading_user: reading_user
611 |> user_activities_recipients()
612 |> fetch_activities(params)
616 def fetch_user_activities(user, reading_user, params \\ %{})
618 def fetch_user_activities(user, reading_user, %{total: true} = params) do
619 result = fetch_activities_for_user(user, reading_user, params)
621 Keyword.put(result, :items, Enum.reverse(result[:items]))
624 def fetch_user_activities(user, reading_user, params) do
626 |> fetch_activities_for_user(reading_user, params)
630 defp fetch_activities_for_user(user, reading_user, params) do
633 |> Map.put(:type, ["Create", "Announce"])
634 |> Map.put(:user, reading_user)
635 |> Map.put(:actor_id, user.ap_id)
636 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
639 if User.blocks?(reading_user, user) do
643 |> Map.put(:blocking_user, reading_user)
644 |> Map.put(:muting_user, reading_user)
647 pagination_type = Map.get(params, :pagination_type) || :keyset
650 godmode: params[:godmode],
651 reading_user: reading_user
653 |> user_activities_recipients()
654 |> fetch_activities(params, pagination_type)
657 def fetch_statuses(reading_user, %{total: true} = params) do
658 result = fetch_activities_for_reading_user(reading_user, params)
659 Keyword.put(result, :items, Enum.reverse(result[:items]))
662 def fetch_statuses(reading_user, params) do
664 |> fetch_activities_for_reading_user(params)
668 defp fetch_activities_for_reading_user(reading_user, params) do
669 params = Map.put(params, :type, ["Create", "Announce"])
672 godmode: params[:godmode],
673 reading_user: reading_user
675 |> user_activities_recipients()
676 |> fetch_activities(params, :offset)
679 defp user_activities_recipients(%{godmode: true}), do: []
681 defp user_activities_recipients(%{reading_user: reading_user}) do
683 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
685 [Constants.as_public()]
689 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
690 raise "Can't use the child object without preloading!"
693 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
695 [activity, object] in query,
698 "?->>'type' != ? or ?->>'actor' != ?",
707 defp restrict_announce_object_actor(query, _), do: query
709 defp restrict_since(query, %{since_id: ""}), do: query
711 defp restrict_since(query, %{since_id: since_id}) do
712 from(activity in query, where: activity.id > ^since_id)
715 defp restrict_since(query, _), do: query
717 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
718 raise_on_missing_preload()
721 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
723 [_activity, object] in query,
724 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
728 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
729 restrict_embedded_tag_any(query, %{tag: tag})
732 defp restrict_embedded_tag_all(query, _), do: query
734 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
735 raise_on_missing_preload()
738 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
740 [_activity, object] in query,
741 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
745 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
746 restrict_embedded_tag_any(query, %{tag: [tag]})
749 defp restrict_embedded_tag_any(query, _), do: query
751 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
752 raise_on_missing_preload()
755 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
757 [_activity, object] in query,
758 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
762 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
763 when is_binary(tag_reject) do
764 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
767 defp restrict_embedded_tag_reject_any(query, _), do: query
769 defp object_ids_query_for_tags(tags) do
770 from(hto in "hashtags_objects")
771 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
772 |> where([hto, ht], ht.name in ^tags)
773 |> select([hto], hto.object_id)
774 |> distinct([hto], true)
777 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
778 raise_on_missing_preload()
781 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
782 restrict_hashtag_any(query, %{tag: single_tag})
785 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
787 [_activity, object] in query,
791 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
792 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
793 AND hashtags_objects.object_id = ?) @> ?
802 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
803 restrict_hashtag_all(query, %{tag_all: [tag]})
806 defp restrict_hashtag_all(query, _), do: query
808 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
809 raise_on_missing_preload()
812 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
814 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
817 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
819 [_activity, object] in query,
820 join: hto in "hashtags_objects",
821 on: hto.object_id == object.id,
822 where: hto.hashtag_id in ^hashtag_ids,
823 distinct: [desc: object.id],
824 order_by: [desc: object.id]
828 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
829 restrict_hashtag_any(query, %{tag: [tag]})
832 defp restrict_hashtag_any(query, _), do: query
834 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
835 raise_on_missing_preload()
838 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
840 [_activity, object] in query,
841 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
845 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
846 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
849 defp restrict_hashtag_reject_any(query, _), do: query
851 defp raise_on_missing_preload do
852 raise "Can't use the child object without preloading!"
855 defp restrict_recipients(query, [], _user), do: query
857 defp restrict_recipients(query, recipients, nil) do
858 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
861 defp restrict_recipients(query, recipients, user) do
864 where: fragment("? && ?", ^recipients, activity.recipients),
865 or_where: activity.actor == ^user.ap_id
869 defp restrict_local(query, %{local_only: true}) do
870 from(activity in query, where: activity.local == true)
873 defp restrict_local(query, _), do: query
875 defp restrict_remote(query, %{remote: true}) do
876 from(activity in query, where: activity.local == false)
879 defp restrict_remote(query, _), do: query
881 defp restrict_actor(query, %{actor_id: actor_id}) do
882 from(activity in query, where: activity.actor == ^actor_id)
885 defp restrict_actor(query, _), do: query
887 defp restrict_type(query, %{type: type}) when is_binary(type) do
888 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
891 defp restrict_type(query, %{type: type}) do
892 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
895 defp restrict_type(query, _), do: query
897 defp restrict_state(query, %{state: state}) do
898 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
901 defp restrict_state(query, _), do: query
903 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
905 [_activity, object] in query,
906 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
910 defp restrict_favorited_by(query, _), do: query
912 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
913 raise "Can't use the child object without preloading!"
916 defp restrict_media(query, %{only_media: true}) do
918 [activity, object] in query,
919 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
920 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
924 defp restrict_media(query, _), do: query
926 defp restrict_replies(query, %{exclude_replies: true}) do
928 [_activity, object] in query,
929 where: fragment("?->>'inReplyTo' is null", object.data)
933 defp restrict_replies(query, %{
934 reply_filtering_user: %User{} = user,
935 reply_visibility: "self"
938 [activity, object] in query,
941 "?->>'inReplyTo' is null OR ? = ANY(?)",
949 defp restrict_replies(query, %{
950 reply_filtering_user: %User{} = user,
951 reply_visibility: "following"
954 [activity, object] in query,
958 ?->>'type' != 'Create' -- This isn't a Create
959 OR ?->>'inReplyTo' is null -- this isn't a reply
960 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
961 -- unless they are the author (because authors
962 -- are also part of the recipients). This leads
963 -- to a bug that self-replies by friends won't
965 OR ? = ? -- The actor is us
969 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
978 defp restrict_replies(query, _), do: query
980 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
981 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
984 defp restrict_reblogs(query, _), do: query
986 defp restrict_muted(query, %{with_muted: true}), do: query
988 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
989 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
992 from([activity] in query,
993 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
996 "not (?->'to' \\?| ?) or ? = ?",
1004 unless opts[:skip_preload] do
1005 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1011 defp restrict_muted(query, _), do: query
1013 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1014 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1015 domain_blocks = user.domain_blocks || []
1017 following_ap_ids = User.get_friends_ap_ids(user)
1020 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1023 [activity, object: o] in query,
1024 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1027 "((not (? && ?)) or ? = ?)",
1028 activity.recipients,
1035 "recipients_contain_blocked_domains(?, ?) = false",
1036 activity.recipients,
1041 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1048 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1056 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1065 defp restrict_blocked(query, _), do: query
1067 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1072 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1074 ^[Constants.as_public()]
1079 defp restrict_unlisted(query, _), do: query
1081 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1083 [activity, object: o] in query,
1086 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1095 defp restrict_pinned(query, _), do: query
1097 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1098 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1104 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1112 defp restrict_muted_reblogs(query, _), do: query
1114 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1117 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1121 defp restrict_instance(query, _), do: query
1123 defp restrict_filtered(query, %{user: %User{} = user}) do
1124 case Filter.compose_regex(user) do
1129 from([activity, object] in query,
1131 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1132 activity.actor == ^user.ap_id
1137 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1138 restrict_filtered(query, %{user: user})
1141 defp restrict_filtered(query, _), do: query
1143 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1145 defp exclude_poll_votes(query, _) do
1146 if has_named_binding?(query, :object) do
1147 from([activity, object: o] in query,
1148 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1155 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1157 defp exclude_chat_messages(query, _) do
1158 if has_named_binding?(query, :object) do
1159 from([activity, object: o] in query,
1160 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1167 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1169 defp exclude_invisible_actors(query, _opts) do
1171 User.Query.build(%{invisible: true, select: [:ap_id]})
1173 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1175 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1178 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1179 from(activity in query, where: activity.id != ^id)
1182 defp exclude_id(query, _), do: query
1184 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1186 defp maybe_preload_objects(query, _) do
1188 |> Activity.with_preloaded_object()
1191 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1193 defp maybe_preload_bookmarks(query, opts) do
1195 |> Activity.with_preloaded_bookmark(opts[:user])
1198 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1200 |> Activity.with_preloaded_report_notes()
1203 defp maybe_preload_report_notes(query, _), do: query
1205 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1207 defp maybe_set_thread_muted_field(query, opts) do
1209 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1212 defp maybe_order(query, %{order: :desc}) do
1214 |> order_by(desc: :id)
1217 defp maybe_order(query, %{order: :asc}) do
1219 |> order_by(asc: :id)
1222 defp maybe_order(query, _), do: query
1224 defp normalize_fetch_activities_query_opts(opts) do
1225 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1227 value when is_bitstring(value) ->
1228 Map.put(opts, key, Hashtag.normalize_name(value))
1230 value when is_list(value) ->
1233 |> Enum.map(&Hashtag.normalize_name/1)
1236 Map.put(opts, key, normalized_value)
1244 defp fetch_activities_query_ap_ids_ops(opts) do
1245 source_user = opts[:muting_user]
1246 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1248 ap_id_relationships =
1249 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1250 [:block | ap_id_relationships]
1255 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1257 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1258 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1260 restrict_muted_reblogs_opts =
1261 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1263 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1266 def fetch_activities_query(recipients, opts \\ %{}) do
1267 opts = normalize_fetch_activities_query_opts(opts)
1269 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1270 fetch_activities_query_ap_ids_ops(opts)
1273 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1278 |> maybe_preload_objects(opts)
1279 |> maybe_preload_bookmarks(opts)
1280 |> maybe_preload_report_notes(opts)
1281 |> maybe_set_thread_muted_field(opts)
1282 |> maybe_order(opts)
1283 |> restrict_recipients(recipients, opts[:user])
1284 |> restrict_replies(opts)
1285 |> restrict_since(opts)
1286 |> restrict_local(opts)
1287 |> restrict_remote(opts)
1288 |> restrict_actor(opts)
1289 |> restrict_type(opts)
1290 |> restrict_state(opts)
1291 |> restrict_favorited_by(opts)
1292 |> restrict_blocked(restrict_blocked_opts)
1293 |> restrict_muted(restrict_muted_opts)
1294 |> restrict_filtered(opts)
1295 |> restrict_media(opts)
1296 |> restrict_visibility(opts)
1297 |> restrict_thread_visibility(opts, config)
1298 |> restrict_reblogs(opts)
1299 |> restrict_pinned(opts)
1300 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1301 |> restrict_instance(opts)
1302 |> restrict_announce_object_actor(opts)
1303 |> restrict_filtered(opts)
1304 |> Activity.restrict_deactivated_users()
1305 |> exclude_poll_votes(opts)
1306 |> exclude_chat_messages(opts)
1307 |> exclude_invisible_actors(opts)
1308 |> exclude_visibility(opts)
1310 if Config.feature_enabled?(:improved_hashtag_timeline) do
1312 |> restrict_hashtag_any(opts)
1313 |> restrict_hashtag_all(opts)
1314 |> restrict_hashtag_reject_any(opts)
1317 |> restrict_embedded_tag_any(opts)
1318 |> restrict_embedded_tag_all(opts)
1319 |> restrict_embedded_tag_reject_any(opts)
1324 Fetch favorites activities of user with order by sort adds to favorites
1326 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1327 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1329 |> Activity.Queries.by_actor()
1330 |> Activity.Queries.by_type("Like")
1331 |> Activity.with_joined_object()
1332 |> Object.with_joined_activity()
1333 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1334 |> order_by([like, _, _], desc_nulls_last: like.id)
1335 |> Pagination.fetch_paginated(
1336 Map.merge(params, %{skip_order: true}),
1341 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1342 Enum.map(activities, fn
1343 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1344 if Enum.any?(bcc, &(&1 in list_memberships)) do
1345 update_in(activity.data["cc"], &[user_ap_id | &1])
1355 defp maybe_update_cc(activities, _, _), do: activities
1357 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1358 from(activity in query,
1360 fragment("? && ?", activity.recipients, ^recipients) or
1361 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1362 ^Constants.as_public() in activity.recipients)
1366 def fetch_activities_bounded(
1368 recipients_with_public,
1370 pagination \\ :keyset
1372 fetch_activities_query([], opts)
1373 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1374 |> Pagination.fetch_paginated(opts, pagination)
1378 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1379 def upload(file, opts \\ []) do
1380 with {:ok, data} <- Upload.store(file, opts) do
1381 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1383 Repo.insert(%Object{data: obj_data})
1387 @spec get_actor_url(any()) :: binary() | nil
1388 defp get_actor_url(url) when is_binary(url), do: url
1389 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1391 defp get_actor_url(url) when is_list(url) do
1397 defp get_actor_url(_url), do: nil
1399 defp normalize_image(%{"url" => url}) do
1402 "url" => [%{"href" => url}]
1406 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1407 defp normalize_image(_), do: nil
1409 defp object_to_user_data(data) do
1412 |> Map.get("attachment", [])
1413 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1414 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1418 |> Map.get("tag", [])
1420 %{"type" => "Emoji"} -> true
1423 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1424 {String.trim(name, ":"), url}
1427 is_locked = data["manuallyApprovesFollowers"] || false
1428 capabilities = data["capabilities"] || %{}
1429 accepts_chat_messages = capabilities["acceptsChatMessages"]
1430 data = Transmogrifier.maybe_fix_user_object(data)
1431 is_discoverable = data["discoverable"] || false
1432 invisible = data["invisible"] || false
1433 actor_type = data["type"] || "Person"
1435 featured_address = data["featured"]
1436 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1439 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1440 data["publicKey"]["publicKeyPem"]
1446 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1447 data["endpoints"]["sharedInbox"]
1454 uri: get_actor_url(data["url"]),
1456 banner: normalize_image(data["image"]),
1459 is_locked: is_locked,
1460 is_discoverable: is_discoverable,
1461 invisible: invisible,
1462 avatar: normalize_image(data["icon"]),
1464 follower_address: data["followers"],
1465 following_address: data["following"],
1466 featured_address: featured_address,
1467 bio: data["summary"] || "",
1468 actor_type: actor_type,
1469 also_known_as: Map.get(data, "alsoKnownAs", []),
1470 public_key: public_key,
1471 inbox: data["inbox"],
1472 shared_inbox: shared_inbox,
1473 accepts_chat_messages: accepts_chat_messages,
1474 pinned_objects: pinned_objects
1477 # nickname can be nil because of virtual actors
1478 if data["preferredUsername"] do
1482 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1485 Map.put(user_data, :nickname, nil)
1489 def fetch_follow_information_for_user(user) do
1490 with {:ok, following_data} <-
1491 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1492 {:ok, hide_follows} <- collection_private(following_data),
1493 {:ok, followers_data} <-
1494 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1495 {:ok, hide_followers} <- collection_private(followers_data) do
1498 hide_follows: hide_follows,
1499 follower_count: normalize_counter(followers_data["totalItems"]),
1500 following_count: normalize_counter(following_data["totalItems"]),
1501 hide_followers: hide_followers
1504 {:error, _} = e -> e
1509 defp normalize_counter(counter) when is_integer(counter), do: counter
1510 defp normalize_counter(_), do: 0
1512 def maybe_update_follow_information(user_data) do
1513 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1514 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1516 {:collections_available,
1517 !!(user_data[:following_address] && user_data[:follower_address])},
1519 fetch_follow_information_for_user(user_data) do
1520 info = Map.merge(user_data[:info] || %{}, info)
1523 |> Map.put(:info, info)
1525 {:user_type_check, false} ->
1528 {:collections_available, false} ->
1531 {:enabled, false} ->
1536 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1543 defp collection_private(%{"first" => %{"type" => type}})
1544 when type in ["CollectionPage", "OrderedCollectionPage"],
1547 defp collection_private(%{"first" => first}) do
1548 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1549 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1552 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1553 {:error, _} = e -> e
1558 defp collection_private(_data), do: {:ok, true}
1560 def user_data_from_user_object(data) do
1561 with {:ok, data} <- MRF.filter(data) do
1562 {:ok, object_to_user_data(data)}
1568 def fetch_and_prepare_user_from_ap_id(ap_id) do
1569 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1570 {:ok, data} <- user_data_from_user_object(data) do
1571 {:ok, maybe_update_follow_information(data)}
1573 # If this has been deleted, only log a debug and not an error
1574 {:error, "Object has been deleted" = e} ->
1575 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1578 {:error, {:reject, reason} = e} ->
1579 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1583 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1588 def maybe_handle_clashing_nickname(data) do
1589 with nickname when is_binary(nickname) <- data[:nickname],
1590 %User{} = old_user <- User.get_by_nickname(nickname),
1591 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1593 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1599 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1600 |> User.update_and_set_cache()
1602 {:ap_id_comparison, true} ->
1604 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1612 def pin_data_from_featured_collection(%{
1614 "orderedItems" => objects
1616 when type in ["OrderedCollection", "Collection"] do
1617 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1620 def fetch_and_prepare_featured_from_ap_id(nil) do
1624 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1625 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1626 {:ok, pin_data_from_featured_collection(data)}
1629 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1634 def pinned_fetch_task(nil), do: nil
1636 def pinned_fetch_task(%{pinned_objects: pins}) do
1637 if Enum.all?(pins, fn {ap_id, _} ->
1638 Object.get_cached_by_ap_id(ap_id) ||
1639 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1647 def make_user_from_ap_id(ap_id) do
1648 user = User.get_cached_by_ap_id(ap_id)
1650 if user && !User.ap_enabled?(user) do
1651 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1653 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1654 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1658 |> User.remote_user_changeset(data)
1659 |> User.update_and_set_cache()
1661 maybe_handle_clashing_nickname(data)
1664 |> User.remote_user_changeset()
1672 def make_user_from_nickname(nickname) do
1673 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1674 make_user_from_ap_id(ap_id)
1676 _e -> {:error, "No AP id in WebFinger"}
1680 # filter out broken threads
1681 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1682 entire_thread_visible_for_user?(activity, user)
1685 # do post-processing on a specific activity
1686 def contain_activity(%Activity{} = activity, %User{} = user) do
1687 contain_broken_threads(activity, user)
1690 def fetch_direct_messages_query do
1692 |> restrict_type(%{type: "Create"})
1693 |> restrict_visibility(%{visibility: "direct"})
1694 |> order_by([activity], asc: activity.id)