1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{deactivated: deactivated} -> not deactivated
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
389 {:error, error} -> Repo.rollback(error)
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
437 "?->>'type' = ? and ?->>'context' = ?",
444 |> exclude_poll_votes(opts)
446 |> order_by([activity], desc: activity.id)
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
452 |> fetch_activities_for_context_query(opts)
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
484 @valid_visibilities ~w[direct unlisted public private]
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
493 "activity_visibility(?, ?, ?) = ANY (?)",
501 Logger.error("Could not restrict visibility to #{visibility}")
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
519 defp restrict_visibility(query, _visibility), do: query
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
528 "activity_visibility(?, ?, ?) = ANY (?)",
536 Logger.error("Could not exclude visibility to #{visibility}")
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
547 "activity_visibility(?, ?, ?) = ?",
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
562 defp exclude_visibility(query, _visibility), do: query
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
577 defp restrict_thread_visibility(query, _, _), do: query
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
586 godmode: params[:godmode],
587 reading_user: reading_user
589 |> user_activities_recipients()
590 |> fetch_activities(params)
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
603 if User.blocks?(reading_user, user) do
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
611 pagination_type = Map.get(params, :pagination_type) || :keyset
614 godmode: params[:godmode],
615 reading_user: reading_user
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
626 godmode: params[:godmode],
627 reading_user: reading_user
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
634 defp user_activities_recipients(%{godmode: true}), do: []
636 defp user_activities_recipients(%{reading_user: reading_user}) do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
640 [Constants.as_public()]
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
650 [activity, object] in query,
653 "?->>'type' != ? or ?->>'actor' != ?",
662 defp restrict_announce_object_actor(query, _), do: query
664 defp restrict_since(query, %{since_id: ""}), do: query
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
670 defp restrict_since(query, _), do: query
672 defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
673 raise_on_missing_preload()
676 defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
678 [_activity, object] in query,
679 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
683 defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject})
684 when is_binary(tag_reject) do
685 restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]})
688 defp restrict_embedded_tag_reject(query, _), do: query
690 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
691 raise_on_missing_preload()
694 defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
696 [_activity, object] in query,
697 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
701 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
702 restrict_embedded_tag(query, %{tag: tag})
705 defp restrict_embedded_tag_all(query, _), do: query
707 defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do
708 raise_on_missing_preload()
711 defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do
713 [_activity, object] in query,
714 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
718 defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do
719 restrict_embedded_tag(query, %{tag: [tag]})
722 defp restrict_embedded_tag(query, _), do: query
724 defp hashtag_conditions(opts) do
725 [:tag, :tag_all, :tag_reject]
726 |> Enum.map(&opts[&1])
727 |> Enum.map(&List.wrap(&1))
730 defp restrict_hashtag_agg(query, opts) do
731 [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
732 has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
738 opts[:skip_preload] ->
739 raise_on_missing_preload()
743 |> group_by_all_bindings()
744 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
745 |> maybe_restrict_hashtag_any(tag_any)
746 |> maybe_restrict_hashtag_all(tag_all)
747 |> maybe_restrict_hashtag_reject_any(tag_reject)
751 # Groups by all bindings to allow aggregation on hashtags
752 defp group_by_all_bindings(query) do
753 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
755 Enum.count(query.aliases) == 4 ->
756 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
758 Enum.count(query.aliases) == 3 ->
759 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
761 Enum.count(query.aliases) == 2 ->
762 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
765 from([a, o] in query, group_by: [a.id, o.id])
769 defp maybe_restrict_hashtag_any(query, []) do
773 defp maybe_restrict_hashtag_any(query, tags) do
777 fragment("array_agg(?) && (?)", hashtag.name, ^tags)
781 defp maybe_restrict_hashtag_all(query, []) do
785 defp maybe_restrict_hashtag_all(query, tags) do
789 fragment("array_agg(?) @> (?)", hashtag.name, ^tags)
793 defp maybe_restrict_hashtag_reject_any(query, []) do
797 defp maybe_restrict_hashtag_reject_any(query, tags) do
801 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags)
805 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
806 raise_on_missing_preload()
809 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
811 |> group_by_all_bindings()
812 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
815 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
819 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
820 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
823 defp restrict_hashtag_reject_any(query, _), do: query
825 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
826 raise_on_missing_preload()
829 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
833 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
837 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
838 restrict_hashtag_any(query, %{tag: tag})
841 defp restrict_hashtag_all(query, _), do: query
843 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
844 raise_on_missing_preload()
847 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
849 [_activity, object] in query,
850 join: hashtag in assoc(object, :hashtags),
851 where: hashtag.name in ^tags
855 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
856 restrict_hashtag_any(query, %{tag: [tag]})
859 defp restrict_hashtag_any(query, _), do: query
861 defp raise_on_missing_preload do
862 raise "Can't use the child object without preloading!"
865 defp restrict_recipients(query, [], _user), do: query
867 defp restrict_recipients(query, recipients, nil) do
868 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
871 defp restrict_recipients(query, recipients, user) do
874 where: fragment("? && ?", ^recipients, activity.recipients),
875 or_where: activity.actor == ^user.ap_id
879 defp restrict_local(query, %{local_only: true}) do
880 from(activity in query, where: activity.local == true)
883 defp restrict_local(query, _), do: query
885 defp restrict_actor(query, %{actor_id: actor_id}) do
886 from(activity in query, where: activity.actor == ^actor_id)
889 defp restrict_actor(query, _), do: query
891 defp restrict_type(query, %{type: type}) when is_binary(type) do
892 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
895 defp restrict_type(query, %{type: type}) do
896 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
899 defp restrict_type(query, _), do: query
901 defp restrict_state(query, %{state: state}) do
902 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
905 defp restrict_state(query, _), do: query
907 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
909 [_activity, object] in query,
910 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
914 defp restrict_favorited_by(query, _), do: query
916 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
917 raise "Can't use the child object without preloading!"
920 defp restrict_media(query, %{only_media: true}) do
922 [activity, object] in query,
923 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
924 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
928 defp restrict_media(query, _), do: query
930 defp restrict_replies(query, %{exclude_replies: true}) do
932 [_activity, object] in query,
933 where: fragment("?->>'inReplyTo' is null", object.data)
937 defp restrict_replies(query, %{
938 reply_filtering_user: %User{} = user,
939 reply_visibility: "self"
942 [activity, object] in query,
945 "?->>'inReplyTo' is null OR ? = ANY(?)",
953 defp restrict_replies(query, %{
954 reply_filtering_user: %User{} = user,
955 reply_visibility: "following"
958 [activity, object] in query,
962 ?->>'type' != 'Create' -- This isn't a Create
963 OR ?->>'inReplyTo' is null -- this isn't a reply
964 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
965 -- unless they are the author (because authors
966 -- are also part of the recipients). This leads
967 -- to a bug that self-replies by friends won't
969 OR ? = ? -- The actor is us
973 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
982 defp restrict_replies(query, _), do: query
984 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
985 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
988 defp restrict_reblogs(query, _), do: query
990 defp restrict_muted(query, %{with_muted: true}), do: query
992 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
993 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
996 from([activity] in query,
997 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1000 "not (?->'to' \\?| ?) or ? = ?",
1008 unless opts[:skip_preload] do
1009 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1015 defp restrict_muted(query, _), do: query
1017 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1018 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1019 domain_blocks = user.domain_blocks || []
1021 following_ap_ids = User.get_friends_ap_ids(user)
1024 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1027 [activity, object: o] in query,
1028 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1031 "((not (? && ?)) or ? = ?)",
1032 activity.recipients,
1039 "recipients_contain_blocked_domains(?, ?) = false",
1040 activity.recipients,
1045 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1052 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1060 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1069 defp restrict_blocked(query, _), do: query
1071 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1076 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1078 ^[Constants.as_public()]
1083 defp restrict_unlisted(query, _), do: query
1085 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1086 from(activity in query, where: activity.id in ^ids)
1089 defp restrict_pinned(query, _), do: query
1091 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1092 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1098 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1106 defp restrict_muted_reblogs(query, _), do: query
1108 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1111 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1115 defp restrict_instance(query, _), do: query
1117 defp restrict_filtered(query, %{user: %User{} = user}) do
1118 case Filter.compose_regex(user) do
1123 from([activity, object] in query,
1125 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1126 activity.actor == ^user.ap_id
1131 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1132 restrict_filtered(query, %{user: user})
1135 defp restrict_filtered(query, _), do: query
1137 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1139 defp exclude_poll_votes(query, _) do
1140 if has_named_binding?(query, :object) do
1141 from([activity, object: o] in query,
1142 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1149 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1151 defp exclude_chat_messages(query, _) do
1152 if has_named_binding?(query, :object) do
1153 from([activity, object: o] in query,
1154 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1161 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1163 defp exclude_invisible_actors(query, _opts) do
1165 User.Query.build(%{invisible: true, select: [:ap_id]})
1167 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1169 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1172 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1173 from(activity in query, where: activity.id != ^id)
1176 defp exclude_id(query, _), do: query
1178 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1180 defp maybe_preload_objects(query, _) do
1182 |> Activity.with_preloaded_object()
1185 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1187 defp maybe_preload_bookmarks(query, opts) do
1189 |> Activity.with_preloaded_bookmark(opts[:user])
1192 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1194 |> Activity.with_preloaded_report_notes()
1197 defp maybe_preload_report_notes(query, _), do: query
1199 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1201 defp maybe_set_thread_muted_field(query, opts) do
1203 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1206 defp maybe_order(query, %{order: :desc}) do
1208 |> order_by(desc: :id)
1211 defp maybe_order(query, %{order: :asc}) do
1213 |> order_by(asc: :id)
1216 defp maybe_order(query, _), do: query
1218 defp fetch_activities_query_ap_ids_ops(opts) do
1219 source_user = opts[:muting_user]
1220 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1222 ap_id_relationships =
1223 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1224 [:block | ap_id_relationships]
1229 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1231 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1232 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1234 restrict_muted_reblogs_opts =
1235 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1237 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1240 def fetch_activities_query(recipients, opts \\ %{}) do
1241 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1242 fetch_activities_query_ap_ids_ops(opts)
1245 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1250 |> maybe_preload_objects(opts)
1251 |> maybe_preload_bookmarks(opts)
1252 |> maybe_preload_report_notes(opts)
1253 |> maybe_set_thread_muted_field(opts)
1254 |> maybe_order(opts)
1255 |> restrict_recipients(recipients, opts[:user])
1256 |> restrict_replies(opts)
1257 |> restrict_since(opts)
1258 |> restrict_local(opts)
1259 |> restrict_actor(opts)
1260 |> restrict_type(opts)
1261 |> restrict_state(opts)
1262 |> restrict_favorited_by(opts)
1263 |> restrict_blocked(restrict_blocked_opts)
1264 |> restrict_muted(restrict_muted_opts)
1265 |> restrict_filtered(opts)
1266 |> restrict_media(opts)
1267 |> restrict_visibility(opts)
1268 |> restrict_thread_visibility(opts, config)
1269 |> restrict_reblogs(opts)
1270 |> restrict_pinned(opts)
1271 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1272 |> restrict_instance(opts)
1273 |> restrict_announce_object_actor(opts)
1274 |> restrict_filtered(opts)
1275 |> Activity.restrict_deactivated_users()
1276 |> exclude_poll_votes(opts)
1277 |> exclude_chat_messages(opts)
1278 |> exclude_invisible_actors(opts)
1279 |> exclude_visibility(opts)
1281 hashtag_timeline_strategy = Config.improved_hashtag_timeline()
1284 !hashtag_timeline_strategy ->
1286 |> restrict_embedded_tag(opts)
1287 |> restrict_embedded_tag_reject(opts)
1288 |> restrict_embedded_tag_all(opts)
1290 hashtag_timeline_strategy == :prefer_aggregation ->
1291 restrict_hashtag_agg(query, opts)
1293 hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) ->
1295 |> distinct([activity], true)
1296 |> restrict_hashtag_any(opts)
1297 |> restrict_hashtag_all(opts)
1298 |> restrict_hashtag_reject_any(opts)
1301 restrict_hashtag_agg(query, opts)
1305 defp avoid_hashtags_aggregation?(opts) do
1306 [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
1308 joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0
1309 Enum.empty?(tag_reject) and joins_count <= 2
1312 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1313 list_memberships = Pleroma.List.memberships(opts[:user])
1315 fetch_activities_query(recipients ++ list_memberships, opts)
1316 |> Pagination.fetch_paginated(opts, pagination)
1318 |> maybe_update_cc(list_memberships, opts[:user])
1322 Fetch favorites activities of user with order by sort adds to favorites
1324 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1325 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1327 |> Activity.Queries.by_actor()
1328 |> Activity.Queries.by_type("Like")
1329 |> Activity.with_joined_object()
1330 |> Object.with_joined_activity()
1331 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1332 |> order_by([like, _, _], desc_nulls_last: like.id)
1333 |> Pagination.fetch_paginated(
1334 Map.merge(params, %{skip_order: true}),
1339 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1340 Enum.map(activities, fn
1341 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1342 if Enum.any?(bcc, &(&1 in list_memberships)) do
1343 update_in(activity.data["cc"], &[user_ap_id | &1])
1353 defp maybe_update_cc(activities, _, _), do: activities
1355 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1356 from(activity in query,
1358 fragment("? && ?", activity.recipients, ^recipients) or
1359 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1360 ^Constants.as_public() in activity.recipients)
1364 def fetch_activities_bounded(
1366 recipients_with_public,
1368 pagination \\ :keyset
1370 fetch_activities_query([], opts)
1371 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1372 |> Pagination.fetch_paginated(opts, pagination)
1376 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1377 def upload(file, opts \\ []) do
1378 with {:ok, data} <- Upload.store(file, opts) do
1379 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1381 Repo.insert(%Object{data: obj_data})
1385 @spec get_actor_url(any()) :: binary() | nil
1386 defp get_actor_url(url) when is_binary(url), do: url
1387 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1389 defp get_actor_url(url) when is_list(url) do
1395 defp get_actor_url(_url), do: nil
1397 defp object_to_user_data(data) do
1399 data["icon"]["url"] &&
1402 "url" => [%{"href" => data["icon"]["url"]}]
1406 data["image"]["url"] &&
1409 "url" => [%{"href" => data["image"]["url"]}]
1414 |> Map.get("attachment", [])
1415 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1416 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1420 |> Map.get("tag", [])
1422 %{"type" => "Emoji"} -> true
1425 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1426 {String.trim(name, ":"), url}
1429 is_locked = data["manuallyApprovesFollowers"] || false
1430 capabilities = data["capabilities"] || %{}
1431 accepts_chat_messages = capabilities["acceptsChatMessages"]
1432 data = Transmogrifier.maybe_fix_user_object(data)
1433 is_discoverable = data["discoverable"] || false
1434 invisible = data["invisible"] || false
1435 actor_type = data["type"] || "Person"
1438 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1439 data["publicKey"]["publicKeyPem"]
1445 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1446 data["endpoints"]["sharedInbox"]
1453 uri: get_actor_url(data["url"]),
1458 is_locked: is_locked,
1459 is_discoverable: is_discoverable,
1460 invisible: invisible,
1463 follower_address: data["followers"],
1464 following_address: data["following"],
1465 bio: data["summary"] || "",
1466 actor_type: actor_type,
1467 also_known_as: Map.get(data, "alsoKnownAs", []),
1468 public_key: public_key,
1469 inbox: data["inbox"],
1470 shared_inbox: shared_inbox,
1471 accepts_chat_messages: accepts_chat_messages
1474 # nickname can be nil because of virtual actors
1475 if data["preferredUsername"] do
1479 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1482 Map.put(user_data, :nickname, nil)
1486 def fetch_follow_information_for_user(user) do
1487 with {:ok, following_data} <-
1488 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1489 {:ok, hide_follows} <- collection_private(following_data),
1490 {:ok, followers_data} <-
1491 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1492 {:ok, hide_followers} <- collection_private(followers_data) do
1495 hide_follows: hide_follows,
1496 follower_count: normalize_counter(followers_data["totalItems"]),
1497 following_count: normalize_counter(following_data["totalItems"]),
1498 hide_followers: hide_followers
1501 {:error, _} = e -> e
1506 defp normalize_counter(counter) when is_integer(counter), do: counter
1507 defp normalize_counter(_), do: 0
1509 def maybe_update_follow_information(user_data) do
1510 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1511 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1513 {:collections_available,
1514 !!(user_data[:following_address] && user_data[:follower_address])},
1516 fetch_follow_information_for_user(user_data) do
1517 info = Map.merge(user_data[:info] || %{}, info)
1520 |> Map.put(:info, info)
1522 {:user_type_check, false} ->
1525 {:collections_available, false} ->
1528 {:enabled, false} ->
1533 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1540 defp collection_private(%{"first" => %{"type" => type}})
1541 when type in ["CollectionPage", "OrderedCollectionPage"],
1544 defp collection_private(%{"first" => first}) do
1545 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1546 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1549 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1550 {:error, _} = e -> e
1555 defp collection_private(_data), do: {:ok, true}
1557 def user_data_from_user_object(data) do
1558 with {:ok, data} <- MRF.filter(data) do
1559 {:ok, object_to_user_data(data)}
1565 def fetch_and_prepare_user_from_ap_id(ap_id) do
1566 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1567 {:ok, data} <- user_data_from_user_object(data) do
1568 {:ok, maybe_update_follow_information(data)}
1570 # If this has been deleted, only log a debug and not an error
1571 {:error, "Object has been deleted" = e} ->
1572 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1575 {:error, {:reject, reason} = e} ->
1576 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1580 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1585 def maybe_handle_clashing_nickname(data) do
1586 with nickname when is_binary(nickname) <- data[:nickname],
1587 %User{} = old_user <- User.get_by_nickname(nickname),
1588 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1590 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1596 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1597 |> User.update_and_set_cache()
1599 {:ap_id_comparison, true} ->
1601 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1609 def make_user_from_ap_id(ap_id) do
1610 user = User.get_cached_by_ap_id(ap_id)
1612 if user && !User.ap_enabled?(user) do
1613 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1615 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1618 |> User.remote_user_changeset(data)
1619 |> User.update_and_set_cache()
1621 maybe_handle_clashing_nickname(data)
1624 |> User.remote_user_changeset()
1632 def make_user_from_nickname(nickname) do
1633 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1634 make_user_from_ap_id(ap_id)
1636 _e -> {:error, "No AP id in WebFinger"}
1640 # filter out broken threads
1641 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1642 entire_thread_visible_for_user?(activity, user)
1645 # do post-processing on a specific activity
1646 def contain_activity(%Activity{} = activity, %User{} = user) do
1647 contain_broken_threads(activity, user)
1650 def fetch_direct_messages_query do
1652 |> restrict_type(%{type: "Create"})
1653 |> restrict_visibility(%{visibility: "direct"})
1654 |> order_by([activity], asc: activity.id)