1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
126 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
127 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 %Activity{} = activity ->
138 {:containment, _} = error ->
141 {:error, _} = error ->
144 {:fake, true, map, recipients} ->
145 activity = %Activity{
149 recipients: recipients,
153 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
156 {:remote_limit_pass, _} ->
157 {:error, :remote_limit}
164 defp insert_activity_with_expiration(data, local, recipients) do
168 actor: data["actor"],
169 recipients: recipients
172 with {:ok, activity} <- Repo.insert(struct) do
173 maybe_create_activity_expiration(activity)
177 def notify_and_stream(activity) do
178 Notification.create_notifications(activity)
180 conversation = create_or_bump_conversation(activity, activity.actor)
181 participations = get_participations(conversation)
183 stream_out_participations(participations)
186 defp maybe_create_activity_expiration(
187 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
190 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
191 activity_id: activity.id,
192 expires_at: expires_at
198 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
200 defp create_or_bump_conversation(activity, actor) do
201 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
202 %User{} = user <- User.get_cached_by_ap_id(actor) do
203 Participation.mark_as_read(user, conversation)
208 defp get_participations({:ok, conversation}) do
210 |> Repo.preload(:participations, force: true)
211 |> Map.get(:participations)
214 defp get_participations(_), do: []
216 def stream_out_participations(participations) do
219 |> Repo.preload(:user)
221 Streamer.stream("participation", participations)
224 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
225 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
226 conversation = Repo.preload(conversation, :participations)
229 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
234 if last_activity_id do
235 stream_out_participations(conversation.participations)
240 def stream_out_participations(_, _), do: :noop
242 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
243 when data_type in ["Create", "Announce", "Delete"] do
245 |> Topics.get_activity_topics()
246 |> Streamer.stream(activity)
249 def stream_out(_activity) do
253 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
254 def create(params, fake \\ false) do
255 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
260 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
261 additional = params[:additional] || %{}
262 # only accept false as false value
263 local = !(params[:local] == false)
264 published = params[:published]
265 quick_insert? = Config.get([:env]) == :benchmark
269 %{to: to, actor: actor, published: published, context: context, object: object},
273 with {:ok, activity} <- insert(create_data, local, fake),
274 {:fake, false, activity} <- {:fake, fake, activity},
275 _ <- increase_replies_count_if_reply(create_data),
276 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
277 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
278 _ <- notify_and_stream(activity),
279 :ok <- maybe_federate(activity) do
282 {:quick_insert, true, activity} ->
285 {:fake, true, activity} ->
289 Repo.rollback(message)
293 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
294 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
295 additional = params[:additional] || %{}
296 # only accept false as false value
297 local = !(params[:local] == false)
298 published = params[:published]
302 %{to: to, actor: actor, published: published, context: context, object: object},
306 with {:ok, activity} <- insert(listen_data, local),
307 _ <- notify_and_stream(activity),
308 :ok <- maybe_federate(activity) do
313 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
314 {:ok, Activity.t()} | nil | {:error, any()}
315 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
316 with {:ok, result} <-
317 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
322 defp do_unfollow(follower, followed, activity_id, local) do
323 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
324 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
325 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
326 {:ok, activity} <- insert(unfollow_data, local),
327 _ <- notify_and_stream(activity),
328 :ok <- maybe_federate(activity) do
332 {:error, error} -> Repo.rollback(error)
336 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 # only accept false as false value
347 local = !(params[:local] == false)
348 forward = !(params[:forward] == false)
350 additional = params[:additional] || %{}
354 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
356 Map.merge(additional, %{"to" => [], "cc" => []})
359 with flag_data <- make_flag_data(params, additional),
360 {:ok, activity} <- insert(flag_data, local),
361 {:ok, stripped_activity} <- strip_report_status_data(activity),
362 _ <- notify_and_stream(activity),
363 :ok <- maybe_federate(stripped_activity) do
364 User.all_superusers()
365 |> Enum.filter(fn user -> not is_nil(user.email) end)
366 |> Enum.each(fn superuser ->
368 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
369 |> Pleroma.Emails.Mailer.deliver_async()
376 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
377 def move(%User{} = origin, %User{} = target, local \\ true) do
380 "actor" => origin.ap_id,
381 "object" => origin.ap_id,
382 "target" => target.ap_id
385 with true <- origin.ap_id in target.also_known_as,
386 {:ok, activity} <- insert(params, local),
387 _ <- notify_and_stream(activity) do
388 maybe_federate(activity)
390 BackgroundWorker.enqueue("move_following", %{
391 "origin_id" => origin.id,
392 "target_id" => target.id
397 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
402 def fetch_activities_for_context_query(context, opts) do
403 public = [Constants.as_public()]
407 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
410 from(activity in Activity)
411 |> maybe_preload_objects(opts)
412 |> maybe_preload_bookmarks(opts)
413 |> maybe_set_thread_muted_field(opts)
414 |> restrict_blocked(opts)
415 |> restrict_recipients(recipients, opts[:user])
416 |> restrict_filtered(opts)
420 "?->>'type' = ? and ?->>'context' = ?",
427 |> exclude_poll_votes(opts)
429 |> order_by([activity], desc: activity.id)
432 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
433 def fetch_activities_for_context(context, opts \\ %{}) do
435 |> fetch_activities_for_context_query(opts)
439 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
440 FlakeId.Ecto.CompatType.t() | nil
441 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
443 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
444 |> restrict_visibility(%{visibility: "direct"})
450 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
451 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
452 opts = Map.delete(opts, :user)
454 [Constants.as_public()]
455 |> fetch_activities_query(opts)
456 |> restrict_unlisted(opts)
457 |> Pagination.fetch_paginated(opts, pagination)
460 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
461 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
463 |> Map.put(:restrict_unlisted, true)
464 |> fetch_public_or_unlisted_activities(pagination)
467 @valid_visibilities ~w[direct unlisted public private]
469 defp restrict_visibility(query, %{visibility: visibility})
470 when is_list(visibility) do
471 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
476 "activity_visibility(?, ?, ?) = ANY (?)",
484 Logger.error("Could not restrict visibility to #{visibility}")
488 defp restrict_visibility(query, %{visibility: visibility})
489 when visibility in @valid_visibilities do
493 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
497 defp restrict_visibility(_query, %{visibility: visibility})
498 when visibility not in @valid_visibilities do
499 Logger.error("Could not restrict visibility to #{visibility}")
502 defp restrict_visibility(query, _visibility), do: query
504 defp exclude_visibility(query, %{exclude_visibilities: visibility})
505 when is_list(visibility) do
506 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
511 "activity_visibility(?, ?, ?) = ANY (?)",
519 Logger.error("Could not exclude visibility to #{visibility}")
524 defp exclude_visibility(query, %{exclude_visibilities: visibility})
525 when visibility in @valid_visibilities do
530 "activity_visibility(?, ?, ?) = ?",
539 defp exclude_visibility(query, %{exclude_visibilities: visibility})
540 when visibility not in [nil | @valid_visibilities] do
541 Logger.error("Could not exclude visibility to #{visibility}")
545 defp exclude_visibility(query, _visibility), do: query
547 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
550 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
553 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
556 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
560 defp restrict_thread_visibility(query, _, _), do: query
562 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
565 |> Map.put(:user, reading_user)
566 |> Map.put(:actor_id, user.ap_id)
569 godmode: params[:godmode],
570 reading_user: reading_user
572 |> user_activities_recipients()
573 |> fetch_activities(params)
577 def fetch_user_activities(user, reading_user, params \\ %{}) do
580 |> Map.put(:type, ["Create", "Announce"])
581 |> Map.put(:user, reading_user)
582 |> Map.put(:actor_id, user.ap_id)
583 |> Map.put(:pinned_activity_ids, user.pinned_activities)
586 if User.blocks?(reading_user, user) do
590 |> Map.put(:blocking_user, reading_user)
591 |> Map.put(:muting_user, reading_user)
595 godmode: params[:godmode],
596 reading_user: reading_user
598 |> user_activities_recipients()
599 |> fetch_activities(params)
603 def fetch_statuses(reading_user, params) do
604 params = Map.put(params, :type, ["Create", "Announce"])
607 godmode: params[:godmode],
608 reading_user: reading_user
610 |> user_activities_recipients()
611 |> fetch_activities(params, :offset)
615 defp user_activities_recipients(%{godmode: true}), do: []
617 defp user_activities_recipients(%{reading_user: reading_user}) do
619 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
621 [Constants.as_public()]
625 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
626 raise "Can't use the child object without preloading!"
629 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
631 [activity, object] in query,
634 "?->>'type' != ? or ?->>'actor' != ?",
643 defp restrict_announce_object_actor(query, _), do: query
645 defp restrict_since(query, %{since_id: ""}), do: query
647 defp restrict_since(query, %{since_id: since_id}) do
648 from(activity in query, where: activity.id > ^since_id)
651 defp restrict_since(query, _), do: query
653 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
654 raise "Can't use the child object without preloading!"
657 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
659 [_activity, object] in query,
660 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
664 defp restrict_tag_reject(query, _), do: query
666 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
667 raise "Can't use the child object without preloading!"
670 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
672 [_activity, object] in query,
673 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
677 defp restrict_tag_all(query, _), do: query
679 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
680 raise "Can't use the child object without preloading!"
683 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
685 [_activity, object] in query,
686 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
690 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
692 [_activity, object] in query,
693 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
697 defp restrict_tag(query, _), do: query
699 defp restrict_recipients(query, [], _user), do: query
701 defp restrict_recipients(query, recipients, nil) do
702 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
705 defp restrict_recipients(query, recipients, user) do
708 where: fragment("? && ?", ^recipients, activity.recipients),
709 or_where: activity.actor == ^user.ap_id
713 defp restrict_local(query, %{local_only: true}) do
714 from(activity in query, where: activity.local == true)
717 defp restrict_local(query, _), do: query
719 defp restrict_actor(query, %{actor_id: actor_id}) do
720 from(activity in query, where: activity.actor == ^actor_id)
723 defp restrict_actor(query, _), do: query
725 defp restrict_type(query, %{type: type}) when is_binary(type) do
726 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
729 defp restrict_type(query, %{type: type}) do
730 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
733 defp restrict_type(query, _), do: query
735 defp restrict_state(query, %{state: state}) do
736 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
739 defp restrict_state(query, _), do: query
741 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
743 [_activity, object] in query,
744 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
748 defp restrict_favorited_by(query, _), do: query
750 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
751 raise "Can't use the child object without preloading!"
754 defp restrict_media(query, %{only_media: true}) do
756 [activity, object] in query,
757 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
758 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
762 defp restrict_media(query, _), do: query
764 defp restrict_replies(query, %{exclude_replies: true}) do
766 [_activity, object] in query,
767 where: fragment("?->>'inReplyTo' is null", object.data)
771 defp restrict_replies(query, %{
772 reply_filtering_user: %User{} = user,
773 reply_visibility: "self"
776 [activity, object] in query,
779 "?->>'inReplyTo' is null OR ? = ANY(?)",
787 defp restrict_replies(query, %{
788 reply_filtering_user: %User{} = user,
789 reply_visibility: "following"
792 [activity, object] in query,
796 ?->>'type' != 'Create' -- This isn't a Create
797 OR ?->>'inReplyTo' is null -- this isn't a reply
798 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
799 -- unless they are the author (because authors
800 -- are also part of the recipients). This leads
801 -- to a bug that self-replies by friends won't
803 OR ? = ? -- The actor is us
807 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
816 defp restrict_replies(query, _), do: query
818 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
819 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
822 defp restrict_reblogs(query, _), do: query
824 defp restrict_muted(query, %{with_muted: true}), do: query
826 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
827 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
830 from([activity] in query,
831 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
834 "not (?->'to' \\?| ?) or ? = ?",
842 unless opts[:skip_preload] do
843 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
849 defp restrict_muted(query, _), do: query
851 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
852 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
853 domain_blocks = user.domain_blocks || []
855 following_ap_ids = User.get_friends_ap_ids(user)
858 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
861 [activity, object: o] in query,
862 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
865 "((not (? && ?)) or ? = ?)",
873 "recipients_contain_blocked_domains(?, ?) = false",
879 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
886 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
894 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
903 defp restrict_blocked(query, _), do: query
905 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
910 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
912 ^[Constants.as_public()]
917 defp restrict_unlisted(query, _), do: query
919 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
920 from(activity in query, where: activity.id in ^ids)
923 defp restrict_pinned(query, _), do: query
925 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
926 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
932 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
940 defp restrict_muted_reblogs(query, _), do: query
942 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
945 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
949 defp restrict_instance(query, _), do: query
951 defp restrict_filtered(query, %{user: %User{} = user}) do
952 case Filter.compose_regex(user) do
957 from([activity, object] in query,
959 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
960 activity.actor == ^user.ap_id
965 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
966 restrict_filtered(query, %{user: user})
969 defp restrict_filtered(query, _), do: query
971 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
973 defp exclude_poll_votes(query, _) do
974 if has_named_binding?(query, :object) do
975 from([activity, object: o] in query,
976 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
983 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
985 defp exclude_chat_messages(query, _) do
986 if has_named_binding?(query, :object) do
987 from([activity, object: o] in query,
988 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
995 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
997 defp exclude_invisible_actors(query, _opts) do
999 User.Query.build(%{invisible: true, select: [:ap_id]})
1001 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1003 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1006 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1007 from(activity in query, where: activity.id != ^id)
1010 defp exclude_id(query, _), do: query
1012 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1014 defp maybe_preload_objects(query, _) do
1016 |> Activity.with_preloaded_object()
1019 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1021 defp maybe_preload_bookmarks(query, opts) do
1023 |> Activity.with_preloaded_bookmark(opts[:user])
1026 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1028 |> Activity.with_preloaded_report_notes()
1031 defp maybe_preload_report_notes(query, _), do: query
1033 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1035 defp maybe_set_thread_muted_field(query, opts) do
1037 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1040 defp maybe_order(query, %{order: :desc}) do
1042 |> order_by(desc: :id)
1045 defp maybe_order(query, %{order: :asc}) do
1047 |> order_by(asc: :id)
1050 defp maybe_order(query, _), do: query
1052 defp fetch_activities_query_ap_ids_ops(opts) do
1053 source_user = opts[:muting_user]
1054 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1056 ap_id_relationships =
1057 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1058 [:block | ap_id_relationships]
1063 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1065 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1066 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1068 restrict_muted_reblogs_opts =
1069 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1071 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1074 def fetch_activities_query(recipients, opts \\ %{}) do
1075 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1076 fetch_activities_query_ap_ids_ops(opts)
1079 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1083 |> maybe_preload_objects(opts)
1084 |> maybe_preload_bookmarks(opts)
1085 |> maybe_preload_report_notes(opts)
1086 |> maybe_set_thread_muted_field(opts)
1087 |> maybe_order(opts)
1088 |> restrict_recipients(recipients, opts[:user])
1089 |> restrict_replies(opts)
1090 |> restrict_tag(opts)
1091 |> restrict_tag_reject(opts)
1092 |> restrict_tag_all(opts)
1093 |> restrict_since(opts)
1094 |> restrict_local(opts)
1095 |> restrict_actor(opts)
1096 |> restrict_type(opts)
1097 |> restrict_state(opts)
1098 |> restrict_favorited_by(opts)
1099 |> restrict_blocked(restrict_blocked_opts)
1100 |> restrict_muted(restrict_muted_opts)
1101 |> restrict_filtered(opts)
1102 |> restrict_media(opts)
1103 |> restrict_visibility(opts)
1104 |> restrict_thread_visibility(opts, config)
1105 |> restrict_reblogs(opts)
1106 |> restrict_pinned(opts)
1107 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1108 |> restrict_instance(opts)
1109 |> restrict_announce_object_actor(opts)
1110 |> restrict_filtered(opts)
1111 |> Activity.restrict_deactivated_users()
1112 |> exclude_poll_votes(opts)
1113 |> exclude_chat_messages(opts)
1114 |> exclude_invisible_actors(opts)
1115 |> exclude_visibility(opts)
1118 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1119 list_memberships = Pleroma.List.memberships(opts[:user])
1121 fetch_activities_query(recipients ++ list_memberships, opts)
1122 |> Pagination.fetch_paginated(opts, pagination)
1124 |> maybe_update_cc(list_memberships, opts[:user])
1128 Fetch favorites activities of user with order by sort adds to favorites
1130 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1131 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1133 |> Activity.Queries.by_actor()
1134 |> Activity.Queries.by_type("Like")
1135 |> Activity.with_joined_object()
1136 |> Object.with_joined_activity()
1137 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1138 |> order_by([like, _, _], desc_nulls_last: like.id)
1139 |> Pagination.fetch_paginated(
1140 Map.merge(params, %{skip_order: true}),
1145 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1146 Enum.map(activities, fn
1147 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1148 if Enum.any?(bcc, &(&1 in list_memberships)) do
1149 update_in(activity.data["cc"], &[user_ap_id | &1])
1159 defp maybe_update_cc(activities, _, _), do: activities
1161 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1162 from(activity in query,
1164 fragment("? && ?", activity.recipients, ^recipients) or
1165 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1166 ^Constants.as_public() in activity.recipients)
1170 def fetch_activities_bounded(
1172 recipients_with_public,
1174 pagination \\ :keyset
1176 fetch_activities_query([], opts)
1177 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1178 |> Pagination.fetch_paginated(opts, pagination)
1182 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1183 def upload(file, opts \\ []) do
1184 with {:ok, data} <- Upload.store(file, opts) do
1185 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1187 Repo.insert(%Object{data: obj_data})
1191 @spec get_actor_url(any()) :: binary() | nil
1192 defp get_actor_url(url) when is_binary(url), do: url
1193 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1195 defp get_actor_url(url) when is_list(url) do
1201 defp get_actor_url(_url), do: nil
1203 defp object_to_user_data(data) do
1205 data["icon"]["url"] &&
1208 "url" => [%{"href" => data["icon"]["url"]}]
1212 data["image"]["url"] &&
1215 "url" => [%{"href" => data["image"]["url"]}]
1220 |> Map.get("attachment", [])
1221 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1222 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1226 |> Map.get("tag", [])
1228 %{"type" => "Emoji"} -> true
1231 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1232 {String.trim(name, ":"), url}
1235 is_locked = data["manuallyApprovesFollowers"] || false
1236 capabilities = data["capabilities"] || %{}
1237 accepts_chat_messages = capabilities["acceptsChatMessages"]
1238 data = Transmogrifier.maybe_fix_user_object(data)
1239 is_discoverable = data["discoverable"] || false
1240 invisible = data["invisible"] || false
1241 actor_type = data["type"] || "Person"
1244 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1245 data["publicKey"]["publicKeyPem"]
1251 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1252 data["endpoints"]["sharedInbox"]
1259 uri: get_actor_url(data["url"]),
1264 is_locked: is_locked,
1265 is_discoverable: is_discoverable,
1266 invisible: invisible,
1269 follower_address: data["followers"],
1270 following_address: data["following"],
1271 bio: data["summary"] || "",
1272 actor_type: actor_type,
1273 also_known_as: Map.get(data, "alsoKnownAs", []),
1274 public_key: public_key,
1275 inbox: data["inbox"],
1276 shared_inbox: shared_inbox,
1277 accepts_chat_messages: accepts_chat_messages
1280 # nickname can be nil because of virtual actors
1281 if data["preferredUsername"] do
1285 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1288 Map.put(user_data, :nickname, nil)
1292 def fetch_follow_information_for_user(user) do
1293 with {:ok, following_data} <-
1294 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
1297 {:ok, hide_follows} <- collection_private(following_data),
1298 {:ok, followers_data} <-
1299 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
1300 {:ok, hide_followers} <- collection_private(followers_data) do
1303 hide_follows: hide_follows,
1304 follower_count: normalize_counter(followers_data["totalItems"]),
1305 following_count: normalize_counter(following_data["totalItems"]),
1306 hide_followers: hide_followers
1309 {:error, _} = e -> e
1314 defp normalize_counter(counter) when is_integer(counter), do: counter
1315 defp normalize_counter(_), do: 0
1317 def maybe_update_follow_information(user_data) do
1318 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1319 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1321 {:collections_available,
1322 !!(user_data[:following_address] && user_data[:follower_address])},
1324 fetch_follow_information_for_user(user_data) do
1325 info = Map.merge(user_data[:info] || %{}, info)
1328 |> Map.put(:info, info)
1330 {:user_type_check, false} ->
1333 {:collections_available, false} ->
1336 {:enabled, false} ->
1341 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1348 defp collection_private(%{"first" => %{"type" => type}})
1349 when type in ["CollectionPage", "OrderedCollectionPage"],
1352 defp collection_private(%{"first" => first}) do
1353 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1354 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1357 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1358 {:error, _} = e -> e
1363 defp collection_private(_data), do: {:ok, true}
1365 def user_data_from_user_object(data) do
1366 with {:ok, data} <- MRF.filter(data) do
1367 {:ok, object_to_user_data(data)}
1373 def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
1374 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
1375 {:ok, data} <- user_data_from_user_object(data) do
1376 {:ok, maybe_update_follow_information(data)}
1378 # If this has been deleted, only log a debug and not an error
1379 {:error, "Object has been deleted" = e} ->
1380 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1383 {:error, {:reject, reason} = e} ->
1384 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1388 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1393 def maybe_handle_clashing_nickname(data) do
1394 with nickname when is_binary(nickname) <- data[:nickname],
1395 %User{} = old_user <- User.get_by_nickname(nickname),
1396 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1398 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1404 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1405 |> User.update_and_set_cache()
1407 {:ap_id_comparison, true} ->
1409 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1417 def make_user_from_ap_id(ap_id, opts \\ []) do
1418 user = User.get_cached_by_ap_id(ap_id)
1420 if user && !User.ap_enabled?(user) do
1421 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1423 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
1426 |> User.remote_user_changeset(data)
1427 |> User.update_and_set_cache()
1429 maybe_handle_clashing_nickname(data)
1432 |> User.remote_user_changeset()
1440 def make_user_from_nickname(nickname) do
1441 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1442 make_user_from_ap_id(ap_id)
1444 _e -> {:error, "No AP id in WebFinger"}
1448 # filter out broken threads
1449 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1450 entire_thread_visible_for_user?(activity, user)
1453 # do post-processing on a specific activity
1454 def contain_activity(%Activity{} = activity, %User{} = user) do
1455 contain_broken_threads(activity, user)
1458 def fetch_direct_messages_query do
1460 |> restrict_type(%{type: "Create"})
1461 |> restrict_visibility(%{visibility: "direct"})
1462 |> order_by([activity], asc: activity.id)