1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
55 defp check_actor_is_active(nil), do: true
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{deactivated: deactivated} -> not deactivated
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
69 defp check_remote_limit(_), do: true
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
88 defp increase_replies_count_if_reply(_create_data), do: :noop
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
103 Repo.insert(%Activity{
106 recipients: recipients,
107 actor: object["actor"]
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 %Activity{} = activity ->
142 {:containment, _} = error ->
145 {:error, _} = error ->
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
153 recipients: recipients,
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
168 defp insert_activity_with_expiration(data, local, recipients) do
172 actor: data["actor"],
173 recipients: recipients
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
187 stream_out_participations(participations)
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
212 defp get_participations({:ok, conversation}) do
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
218 defp get_participations(_), do: []
220 def stream_out_participations(participations) do
223 |> Repo.preload(:user)
225 Streamer.stream("participation", participations)
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
246 def stream_out_participations(_, _), do: :noop
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
257 def stream_out(_activity) do
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
277 %{to: to, actor: actor, published: published, context: context, object: object},
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
290 {:quick_insert, true, activity} ->
293 {:fake, true, activity} ->
297 Repo.rollback(message)
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
310 %{to: to, actor: actor, published: published, context: context, object: object},
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
340 {:error, error} -> Repo.rollback(error)
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
364 additional = params[:additional] || %{}
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 Map.merge(additional, %{"to" => [], "cc" => []})
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
389 {:error, error} -> Repo.rollback(error)
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_blockers_visibility(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
438 "?->>'type' = ? and ?->>'context' = ?",
445 |> exclude_poll_votes(opts)
447 |> order_by([activity], desc: activity.id)
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
453 |> fetch_activities_for_context_query(opts)
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
485 @valid_visibilities ~w[direct unlisted public private]
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
494 "activity_visibility(?, ?, ?) = ANY (?)",
502 Logger.error("Could not restrict visibility to #{visibility}")
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
520 defp restrict_visibility(query, _visibility), do: query
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
529 "activity_visibility(?, ?, ?) = ANY (?)",
537 Logger.error("Could not exclude visibility to #{visibility}")
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
548 "activity_visibility(?, ?, ?) = ?",
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
563 defp exclude_visibility(query, _visibility), do: query
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
578 defp restrict_thread_visibility(query, _, _), do: query
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
587 godmode: params[:godmode],
588 reading_user: reading_user
590 |> user_activities_recipients()
591 |> fetch_activities(params)
595 def fetch_user_activities(user, reading_user, params \\ %{}) do
598 |> Map.put(:type, ["Create", "Announce"])
599 |> Map.put(:user, reading_user)
600 |> Map.put(:actor_id, user.ap_id)
601 |> Map.put(:pinned_activity_ids, user.pinned_activities)
604 if User.blocks?(reading_user, user) do
608 |> Map.put(:blocking_user, reading_user)
609 |> Map.put(:muting_user, reading_user)
612 pagination_type = Map.get(params, :pagination_type) || :keyset
615 godmode: params[:godmode],
616 reading_user: reading_user
618 |> user_activities_recipients()
619 |> fetch_activities(params, pagination_type)
623 def fetch_statuses(reading_user, params) do
624 params = Map.put(params, :type, ["Create", "Announce"])
627 godmode: params[:godmode],
628 reading_user: reading_user
630 |> user_activities_recipients()
631 |> fetch_activities(params, :offset)
635 defp user_activities_recipients(%{godmode: true}), do: []
637 defp user_activities_recipients(%{reading_user: reading_user}) do
639 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
641 [Constants.as_public()]
645 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
646 raise "Can't use the child object without preloading!"
649 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
651 [activity, object] in query,
654 "?->>'type' != ? or ?->>'actor' != ?",
663 defp restrict_announce_object_actor(query, _), do: query
665 defp restrict_since(query, %{since_id: ""}), do: query
667 defp restrict_since(query, %{since_id: since_id}) do
668 from(activity in query, where: activity.id > ^since_id)
671 defp restrict_since(query, _), do: query
673 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
674 raise "Can't use the child object without preloading!"
677 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
679 [_activity, object] in query,
680 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
684 defp restrict_tag_reject(query, _), do: query
686 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
690 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
692 [_activity, object] in query,
693 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
697 defp restrict_tag_all(query, _), do: query
699 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
700 raise "Can't use the child object without preloading!"
703 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
710 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
712 [_activity, object] in query,
713 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
717 defp restrict_tag(query, _), do: query
719 defp restrict_recipients(query, [], _user), do: query
721 defp restrict_recipients(query, recipients, nil) do
722 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
725 defp restrict_recipients(query, recipients, user) do
728 where: fragment("? && ?", ^recipients, activity.recipients),
729 or_where: activity.actor == ^user.ap_id
733 defp restrict_local(query, %{local_only: true}) do
734 from(activity in query, where: activity.local == true)
737 defp restrict_local(query, _), do: query
739 defp restrict_actor(query, %{actor_id: actor_id}) do
740 from(activity in query, where: activity.actor == ^actor_id)
743 defp restrict_actor(query, _), do: query
745 defp restrict_type(query, %{type: type}) when is_binary(type) do
746 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
749 defp restrict_type(query, %{type: type}) do
750 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
753 defp restrict_type(query, _), do: query
755 defp restrict_state(query, %{state: state}) do
756 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
759 defp restrict_state(query, _), do: query
761 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
763 [_activity, object] in query,
764 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
768 defp restrict_favorited_by(query, _), do: query
770 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
771 raise "Can't use the child object without preloading!"
774 defp restrict_media(query, %{only_media: true}) do
776 [activity, object] in query,
777 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
778 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
782 defp restrict_media(query, _), do: query
784 defp restrict_replies(query, %{exclude_replies: true}) do
786 [_activity, object] in query,
787 where: fragment("?->>'inReplyTo' is null", object.data)
791 defp restrict_replies(query, %{
792 reply_filtering_user: %User{} = user,
793 reply_visibility: "self"
796 [activity, object] in query,
799 "?->>'inReplyTo' is null OR ? = ANY(?)",
807 defp restrict_replies(query, %{
808 reply_filtering_user: %User{} = user,
809 reply_visibility: "following"
812 [activity, object] in query,
816 ?->>'type' != 'Create' -- This isn't a Create
817 OR ?->>'inReplyTo' is null -- this isn't a reply
818 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
819 -- unless they are the author (because authors
820 -- are also part of the recipients). This leads
821 -- to a bug that self-replies by friends won't
823 OR ? = ? -- The actor is us
827 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
836 defp restrict_replies(query, _), do: query
838 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
839 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
842 defp restrict_reblogs(query, _), do: query
844 defp restrict_muted(query, %{with_muted: true}), do: query
846 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
847 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
850 from([activity] in query,
851 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
854 "not (?->'to' \\?| ?) or ? = ?",
862 unless opts[:skip_preload] do
863 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
869 defp restrict_muted(query, _), do: query
871 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
872 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
873 domain_blocks = user.domain_blocks || []
875 following_ap_ids = User.get_friends_ap_ids(user)
878 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
881 [activity, object: o] in query,
882 # You don't block the author
883 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
885 # You don't block any recipients, and didn't author the post
888 "((not (? && ?)) or ? = ?)",
895 # You don't block the domain of any recipients, and didn't author the post
898 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
905 # It's not a boost of a user you block
908 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
914 # You don't block the author's domain, and also don't follow the author
917 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
924 # Same as above, but checks the Object
927 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
936 defp restrict_blocked(query, _), do: query
938 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
939 if Config.get([:activitypub, :blockers_visible]) == true do
942 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
946 # The author doesn't block you
947 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
949 # It's not a boost of a user that blocks you
952 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
961 defp restrict_blockers_visibility(query, _), do: query
963 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
968 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
970 ^[Constants.as_public()]
975 defp restrict_unlisted(query, _), do: query
977 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
978 from(activity in query, where: activity.id in ^ids)
981 defp restrict_pinned(query, _), do: query
983 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
984 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
990 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
998 defp restrict_muted_reblogs(query, _), do: query
1000 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1003 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1007 defp restrict_instance(query, _), do: query
1009 defp restrict_filtered(query, %{user: %User{} = user}) do
1010 case Filter.compose_regex(user) do
1015 from([activity, object] in query,
1017 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1018 activity.actor == ^user.ap_id
1023 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1024 restrict_filtered(query, %{user: user})
1027 defp restrict_filtered(query, _), do: query
1029 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1031 defp exclude_poll_votes(query, _) do
1032 if has_named_binding?(query, :object) do
1033 from([activity, object: o] in query,
1034 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1041 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1043 defp exclude_chat_messages(query, _) do
1044 if has_named_binding?(query, :object) do
1045 from([activity, object: o] in query,
1046 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1053 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1055 defp exclude_invisible_actors(query, _opts) do
1057 User.Query.build(%{invisible: true, select: [:ap_id]})
1059 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1061 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1064 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1065 from(activity in query, where: activity.id != ^id)
1068 defp exclude_id(query, _), do: query
1070 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1072 defp maybe_preload_objects(query, _) do
1074 |> Activity.with_preloaded_object()
1077 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1079 defp maybe_preload_bookmarks(query, opts) do
1081 |> Activity.with_preloaded_bookmark(opts[:user])
1084 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1086 |> Activity.with_preloaded_report_notes()
1089 defp maybe_preload_report_notes(query, _), do: query
1091 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1093 defp maybe_set_thread_muted_field(query, opts) do
1095 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1098 defp maybe_order(query, %{order: :desc}) do
1100 |> order_by(desc: :id)
1103 defp maybe_order(query, %{order: :asc}) do
1105 |> order_by(asc: :id)
1108 defp maybe_order(query, _), do: query
1110 defp fetch_activities_query_ap_ids_ops(opts) do
1111 source_user = opts[:muting_user]
1112 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1114 ap_id_relationships =
1115 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1116 [:block | ap_id_relationships]
1121 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1123 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1124 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1126 restrict_muted_reblogs_opts =
1127 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1129 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1132 def fetch_activities_query(recipients, opts \\ %{}) do
1133 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1134 fetch_activities_query_ap_ids_ops(opts)
1137 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1141 |> maybe_preload_objects(opts)
1142 |> maybe_preload_bookmarks(opts)
1143 |> maybe_preload_report_notes(opts)
1144 |> maybe_set_thread_muted_field(opts)
1145 |> maybe_order(opts)
1146 |> restrict_recipients(recipients, opts[:user])
1147 |> restrict_replies(opts)
1148 |> restrict_tag(opts)
1149 |> restrict_tag_reject(opts)
1150 |> restrict_tag_all(opts)
1151 |> restrict_since(opts)
1152 |> restrict_local(opts)
1153 |> restrict_actor(opts)
1154 |> restrict_type(opts)
1155 |> restrict_state(opts)
1156 |> restrict_favorited_by(opts)
1157 |> restrict_blocked(restrict_blocked_opts)
1158 |> restrict_blockers_visibility(opts)
1159 |> restrict_muted(restrict_muted_opts)
1160 |> restrict_filtered(opts)
1161 |> restrict_media(opts)
1162 |> restrict_visibility(opts)
1163 |> restrict_thread_visibility(opts, config)
1164 |> restrict_reblogs(opts)
1165 |> restrict_pinned(opts)
1166 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1167 |> restrict_instance(opts)
1168 |> restrict_announce_object_actor(opts)
1169 |> restrict_filtered(opts)
1170 |> Activity.restrict_deactivated_users()
1171 |> exclude_poll_votes(opts)
1172 |> exclude_chat_messages(opts)
1173 |> exclude_invisible_actors(opts)
1174 |> exclude_visibility(opts)
1177 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1178 list_memberships = Pleroma.List.memberships(opts[:user])
1180 fetch_activities_query(recipients ++ list_memberships, opts)
1181 |> Pagination.fetch_paginated(opts, pagination)
1183 |> maybe_update_cc(list_memberships, opts[:user])
1187 Fetch favorites activities of user with order by sort adds to favorites
1189 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1190 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1192 |> Activity.Queries.by_actor()
1193 |> Activity.Queries.by_type("Like")
1194 |> Activity.with_joined_object()
1195 |> Object.with_joined_activity()
1196 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1197 |> order_by([like, _, _], desc_nulls_last: like.id)
1198 |> Pagination.fetch_paginated(
1199 Map.merge(params, %{skip_order: true}),
1204 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1205 Enum.map(activities, fn
1206 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1207 if Enum.any?(bcc, &(&1 in list_memberships)) do
1208 update_in(activity.data["cc"], &[user_ap_id | &1])
1218 defp maybe_update_cc(activities, _, _), do: activities
1220 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1221 from(activity in query,
1223 fragment("? && ?", activity.recipients, ^recipients) or
1224 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1225 ^Constants.as_public() in activity.recipients)
1229 def fetch_activities_bounded(
1231 recipients_with_public,
1233 pagination \\ :keyset
1235 fetch_activities_query([], opts)
1236 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1237 |> Pagination.fetch_paginated(opts, pagination)
1241 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1242 def upload(file, opts \\ []) do
1243 with {:ok, data} <- Upload.store(file, opts) do
1244 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1246 Repo.insert(%Object{data: obj_data})
1250 @spec get_actor_url(any()) :: binary() | nil
1251 defp get_actor_url(url) when is_binary(url), do: url
1252 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1254 defp get_actor_url(url) when is_list(url) do
1260 defp get_actor_url(_url), do: nil
1262 defp object_to_user_data(data) do
1264 data["icon"]["url"] &&
1267 "url" => [%{"href" => data["icon"]["url"]}]
1271 data["image"]["url"] &&
1274 "url" => [%{"href" => data["image"]["url"]}]
1279 |> Map.get("attachment", [])
1280 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1281 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1285 |> Map.get("tag", [])
1287 %{"type" => "Emoji"} -> true
1290 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1291 {String.trim(name, ":"), url}
1294 is_locked = data["manuallyApprovesFollowers"] || false
1295 capabilities = data["capabilities"] || %{}
1296 accepts_chat_messages = capabilities["acceptsChatMessages"]
1297 data = Transmogrifier.maybe_fix_user_object(data)
1298 is_discoverable = data["discoverable"] || false
1299 invisible = data["invisible"] || false
1300 actor_type = data["type"] || "Person"
1303 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1304 data["publicKey"]["publicKeyPem"]
1310 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1311 data["endpoints"]["sharedInbox"]
1318 uri: get_actor_url(data["url"]),
1323 is_locked: is_locked,
1324 is_discoverable: is_discoverable,
1325 invisible: invisible,
1328 follower_address: data["followers"],
1329 following_address: data["following"],
1330 bio: data["summary"] || "",
1331 actor_type: actor_type,
1332 also_known_as: Map.get(data, "alsoKnownAs", []),
1333 public_key: public_key,
1334 inbox: data["inbox"],
1335 shared_inbox: shared_inbox,
1336 accepts_chat_messages: accepts_chat_messages
1339 # nickname can be nil because of virtual actors
1340 if data["preferredUsername"] do
1344 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1347 Map.put(user_data, :nickname, nil)
1351 def fetch_follow_information_for_user(user) do
1352 with {:ok, following_data} <-
1353 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1354 {:ok, hide_follows} <- collection_private(following_data),
1355 {:ok, followers_data} <-
1356 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1357 {:ok, hide_followers} <- collection_private(followers_data) do
1360 hide_follows: hide_follows,
1361 follower_count: normalize_counter(followers_data["totalItems"]),
1362 following_count: normalize_counter(following_data["totalItems"]),
1363 hide_followers: hide_followers
1366 {:error, _} = e -> e
1371 defp normalize_counter(counter) when is_integer(counter), do: counter
1372 defp normalize_counter(_), do: 0
1374 def maybe_update_follow_information(user_data) do
1375 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1376 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1378 {:collections_available,
1379 !!(user_data[:following_address] && user_data[:follower_address])},
1381 fetch_follow_information_for_user(user_data) do
1382 info = Map.merge(user_data[:info] || %{}, info)
1385 |> Map.put(:info, info)
1387 {:user_type_check, false} ->
1390 {:collections_available, false} ->
1393 {:enabled, false} ->
1398 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1405 defp collection_private(%{"first" => %{"type" => type}})
1406 when type in ["CollectionPage", "OrderedCollectionPage"],
1409 defp collection_private(%{"first" => first}) do
1410 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1411 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1414 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1415 {:error, _} = e -> e
1420 defp collection_private(_data), do: {:ok, true}
1422 def user_data_from_user_object(data) do
1423 with {:ok, data} <- MRF.filter(data) do
1424 {:ok, object_to_user_data(data)}
1430 def fetch_and_prepare_user_from_ap_id(ap_id) do
1431 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1432 {:ok, data} <- user_data_from_user_object(data) do
1433 {:ok, maybe_update_follow_information(data)}
1435 # If this has been deleted, only log a debug and not an error
1436 {:error, "Object has been deleted" = e} ->
1437 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1440 {:error, {:reject, reason} = e} ->
1441 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1445 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1450 def maybe_handle_clashing_nickname(data) do
1451 with nickname when is_binary(nickname) <- data[:nickname],
1452 %User{} = old_user <- User.get_by_nickname(nickname),
1453 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1455 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1461 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1462 |> User.update_and_set_cache()
1464 {:ap_id_comparison, true} ->
1466 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1474 def make_user_from_ap_id(ap_id) do
1475 user = User.get_cached_by_ap_id(ap_id)
1477 if user && !User.ap_enabled?(user) do
1478 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1480 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1483 |> User.remote_user_changeset(data)
1484 |> User.update_and_set_cache()
1486 maybe_handle_clashing_nickname(data)
1489 |> User.remote_user_changeset()
1497 def make_user_from_nickname(nickname) do
1498 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1499 make_user_from_ap_id(ap_id)
1501 _e -> {:error, "No AP id in WebFinger"}
1505 # filter out broken threads
1506 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1507 entire_thread_visible_for_user?(activity, user)
1510 # do post-processing on a specific activity
1511 def contain_activity(%Activity{} = activity, %User{} = user) do
1512 contain_broken_threads(activity, user)
1515 def fetch_direct_messages_query do
1517 |> restrict_type(%{type: "Create"})
1518 |> restrict_visibility(%{visibility: "direct"})
1519 |> order_by([activity], asc: activity.id)