1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Event]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 {:ok, activity, meta}
109 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
110 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
111 with nil <- Activity.normalize(map),
112 map <- lazy_put_activity_defaults(map, fake),
113 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
114 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
115 {:ok, map} <- MRF.filter(map),
116 {recipients, _, _} = get_recipients(map),
117 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
118 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
119 {:ok, map, object} <- insert_full_object(map) do
125 recipients: recipients
128 |> maybe_create_activity_expiration()
130 # Splice in the child object if we have one.
131 activity = Maps.put_if_present(activity, :object, object)
133 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
137 %Activity{} = activity ->
140 {:fake, true, map, recipients} ->
141 activity = %Activity{
145 recipients: recipients,
149 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
157 def notify_and_stream(activity) do
158 Notification.create_notifications(activity)
160 conversation = create_or_bump_conversation(activity, activity.actor)
161 participations = get_participations(conversation)
163 stream_out_participations(participations)
166 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
168 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
169 activity_id: activity.id,
170 expires_at: expires_at
176 defp maybe_create_activity_expiration(result), do: result
178 defp create_or_bump_conversation(activity, actor) do
179 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
180 %User{} = user <- User.get_cached_by_ap_id(actor) do
181 Participation.mark_as_read(user, conversation)
186 defp get_participations({:ok, conversation}) do
188 |> Repo.preload(:participations, force: true)
189 |> Map.get(:participations)
192 defp get_participations(_), do: []
194 def stream_out_participations(participations) do
197 |> Repo.preload(:user)
199 Streamer.stream("participation", participations)
202 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
203 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
204 conversation = Repo.preload(conversation, :participations)
207 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
212 if last_activity_id do
213 stream_out_participations(conversation.participations)
218 def stream_out_participations(_, _), do: :noop
220 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
221 when data_type in ["Create", "Announce", "Delete"] do
223 |> Topics.get_activity_topics()
224 |> Streamer.stream(activity)
227 def stream_out(_activity) do
231 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
232 def create(params, fake \\ false) do
233 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
238 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
239 additional = params[:additional] || %{}
240 # only accept false as false value
241 local = !(params[:local] == false)
242 published = params[:published]
243 quick_insert? = Config.get([:env]) == :benchmark
247 %{to: to, actor: actor, published: published, context: context, object: object},
251 with {:ok, activity} <- insert(create_data, local, fake),
252 {:fake, false, activity} <- {:fake, fake, activity},
253 _ <- increase_replies_count_if_reply(create_data),
254 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
255 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
256 _ <- notify_and_stream(activity),
257 :ok <- maybe_federate(activity) do
260 {:quick_insert, true, activity} ->
263 {:fake, true, activity} ->
267 Repo.rollback(message)
271 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
272 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
273 additional = params[:additional] || %{}
274 # only accept false as false value
275 local = !(params[:local] == false)
276 published = params[:published]
280 %{to: to, actor: actor, published: published, context: context, object: object},
284 with {:ok, activity} <- insert(listen_data, local),
285 _ <- notify_and_stream(activity),
286 :ok <- maybe_federate(activity) do
291 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
292 {:ok, Activity.t()} | nil | {:error, any()}
293 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
294 with {:ok, result} <-
295 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
300 defp do_unfollow(follower, followed, activity_id, local) do
301 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
302 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
303 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
304 {:ok, activity} <- insert(unfollow_data, local),
305 _ <- notify_and_stream(activity),
306 :ok <- maybe_federate(activity) do
310 {:error, error} -> Repo.rollback(error)
314 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
324 # only accept false as false value
325 local = !(params[:local] == false)
326 forward = !(params[:forward] == false)
328 additional = params[:additional] || %{}
332 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
334 Map.merge(additional, %{"to" => [], "cc" => []})
337 with flag_data <- make_flag_data(params, additional),
338 {:ok, activity} <- insert(flag_data, local),
339 {:ok, stripped_activity} <- strip_report_status_data(activity),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(stripped_activity) do
342 User.all_superusers()
343 |> Enum.filter(fn user -> not is_nil(user.email) end)
344 |> Enum.each(fn superuser ->
346 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
347 |> Pleroma.Emails.Mailer.deliver_async()
354 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
355 def move(%User{} = origin, %User{} = target, local \\ true) do
358 "actor" => origin.ap_id,
359 "object" => origin.ap_id,
360 "target" => target.ap_id
363 with true <- origin.ap_id in target.also_known_as,
364 {:ok, activity} <- insert(params, local),
365 _ <- notify_and_stream(activity) do
366 maybe_federate(activity)
368 BackgroundWorker.enqueue("move_following", %{
369 "origin_id" => origin.id,
370 "target_id" => target.id
375 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
380 def fetch_activities_for_context_query(context, opts) do
381 public = [Constants.as_public()]
385 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
388 from(activity in Activity)
389 |> maybe_preload_objects(opts)
390 |> maybe_preload_bookmarks(opts)
391 |> maybe_set_thread_muted_field(opts)
392 |> restrict_blocked(opts)
393 |> restrict_recipients(recipients, opts[:user])
394 |> restrict_filtered(opts)
398 "?->>'type' = ? and ?->>'context' = ?",
405 |> exclude_poll_votes(opts)
407 |> order_by([activity], desc: activity.id)
410 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
411 def fetch_activities_for_context(context, opts \\ %{}) do
413 |> fetch_activities_for_context_query(opts)
417 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
418 FlakeId.Ecto.CompatType.t() | nil
419 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
421 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
422 |> restrict_visibility(%{visibility: "direct"})
428 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
429 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
430 opts = Map.delete(opts, :user)
432 [Constants.as_public()]
433 |> fetch_activities_query(opts)
434 |> restrict_unlisted(opts)
435 |> Pagination.fetch_paginated(opts, pagination)
438 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
439 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
441 |> Map.put(:restrict_unlisted, true)
442 |> fetch_public_or_unlisted_activities(pagination)
445 @valid_visibilities ~w[direct unlisted public private]
447 defp restrict_visibility(query, %{visibility: visibility})
448 when is_list(visibility) do
449 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
454 "activity_visibility(?, ?, ?) = ANY (?)",
462 Logger.error("Could not restrict visibility to #{visibility}")
466 defp restrict_visibility(query, %{visibility: visibility})
467 when visibility in @valid_visibilities do
471 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
475 defp restrict_visibility(_query, %{visibility: visibility})
476 when visibility not in @valid_visibilities do
477 Logger.error("Could not restrict visibility to #{visibility}")
480 defp restrict_visibility(query, _visibility), do: query
482 defp exclude_visibility(query, %{exclude_visibilities: visibility})
483 when is_list(visibility) do
484 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 "activity_visibility(?, ?, ?) = ANY (?)",
497 Logger.error("Could not exclude visibility to #{visibility}")
502 defp exclude_visibility(query, %{exclude_visibilities: visibility})
503 when visibility in @valid_visibilities do
508 "activity_visibility(?, ?, ?) = ?",
517 defp exclude_visibility(query, %{exclude_visibilities: visibility})
518 when visibility not in [nil | @valid_visibilities] do
519 Logger.error("Could not exclude visibility to #{visibility}")
523 defp exclude_visibility(query, _visibility), do: query
525 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
528 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
531 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
534 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
538 defp restrict_thread_visibility(query, _, _), do: query
540 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
543 |> Map.put(:user, reading_user)
544 |> Map.put(:actor_id, user.ap_id)
547 godmode: params[:godmode],
548 reading_user: reading_user
550 |> user_activities_recipients()
551 |> fetch_activities(params)
555 def fetch_user_activities(user, reading_user, params \\ %{}) do
558 |> Map.put(:type, ["Create", "Announce"])
559 |> Map.put(:user, reading_user)
560 |> Map.put(:actor_id, user.ap_id)
561 |> Map.put(:pinned_activity_ids, user.pinned_activities)
564 if User.blocks?(reading_user, user) do
568 |> Map.put(:blocking_user, reading_user)
569 |> Map.put(:muting_user, reading_user)
573 godmode: params[:godmode],
574 reading_user: reading_user
576 |> user_activities_recipients()
577 |> fetch_activities(params)
581 def fetch_statuses(reading_user, params) do
582 params = Map.put(params, :type, ["Create", "Announce"])
585 godmode: params[:godmode],
586 reading_user: reading_user
588 |> user_activities_recipients()
589 |> fetch_activities(params, :offset)
593 defp user_activities_recipients(%{godmode: true}), do: []
595 defp user_activities_recipients(%{reading_user: reading_user}) do
597 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
599 [Constants.as_public()]
603 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
604 raise "Can't use the child object without preloading!"
607 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
609 [activity, object] in query,
612 "?->>'type' != ? or ?->>'actor' != ?",
621 defp restrict_announce_object_actor(query, _), do: query
623 defp restrict_since(query, %{since_id: ""}), do: query
625 defp restrict_since(query, %{since_id: since_id}) do
626 from(activity in query, where: activity.id > ^since_id)
629 defp restrict_since(query, _), do: query
631 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
632 raise "Can't use the child object without preloading!"
635 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
637 [_activity, object] in query,
638 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
642 defp restrict_tag_reject(query, _), do: query
644 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
648 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
650 [_activity, object] in query,
651 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
655 defp restrict_tag_all(query, _), do: query
657 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
658 raise "Can't use the child object without preloading!"
661 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
663 [_activity, object] in query,
664 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
668 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
670 [_activity, object] in query,
671 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
675 defp restrict_tag(query, _), do: query
677 defp restrict_recipients(query, [], _user), do: query
679 defp restrict_recipients(query, recipients, nil) do
680 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
683 defp restrict_recipients(query, recipients, user) do
686 where: fragment("? && ?", ^recipients, activity.recipients),
687 or_where: activity.actor == ^user.ap_id
691 defp restrict_local(query, %{local_only: true}) do
692 from(activity in query, where: activity.local == true)
695 defp restrict_local(query, _), do: query
697 defp restrict_actor(query, %{actor_id: actor_id}) do
698 from(activity in query, where: activity.actor == ^actor_id)
701 defp restrict_actor(query, _), do: query
703 defp restrict_type(query, %{type: type}) when is_binary(type) do
704 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
707 defp restrict_type(query, %{type: type}) do
708 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
711 defp restrict_type(query, _), do: query
713 defp restrict_state(query, %{state: state}) do
714 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
717 defp restrict_state(query, _), do: query
719 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
721 [_activity, object] in query,
722 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
726 defp restrict_favorited_by(query, _), do: query
728 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
729 raise "Can't use the child object without preloading!"
732 defp restrict_media(query, %{only_media: true}) do
734 [activity, object] in query,
735 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
736 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
740 defp restrict_media(query, _), do: query
742 defp restrict_replies(query, %{exclude_replies: true}) do
744 [_activity, object] in query,
745 where: fragment("?->>'inReplyTo' is null", object.data)
749 defp restrict_replies(query, %{
750 reply_filtering_user: user,
751 reply_visibility: "self"
754 [activity, object] in query,
757 "?->>'inReplyTo' is null OR ? = ANY(?)",
765 defp restrict_replies(query, %{
766 reply_filtering_user: user,
767 reply_visibility: "following"
770 [activity, object] in query,
773 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
775 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
784 defp restrict_replies(query, _), do: query
786 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
787 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
790 defp restrict_reblogs(query, _), do: query
792 defp restrict_muted(query, %{with_muted: true}), do: query
794 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
795 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
798 from([activity] in query,
799 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
800 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
803 unless opts[:skip_preload] do
804 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
810 defp restrict_muted(query, _), do: query
812 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
813 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
814 domain_blocks = user.domain_blocks || []
816 following_ap_ids = User.get_friends_ap_ids(user)
819 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
822 [activity, object: o] in query,
823 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
824 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
827 "recipients_contain_blocked_domains(?, ?) = false",
833 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
840 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
848 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
857 defp restrict_blocked(query, _), do: query
859 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
864 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
866 ^[Constants.as_public()]
871 defp restrict_unlisted(query, _), do: query
873 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
874 from(activity in query, where: activity.id in ^ids)
877 defp restrict_pinned(query, _), do: query
879 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
880 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
886 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
894 defp restrict_muted_reblogs(query, _), do: query
896 defp restrict_instance(query, %{instance: instance}) do
901 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
905 from(activity in query, where: activity.actor in ^users)
908 defp restrict_instance(query, _), do: query
910 defp restrict_filtered(query, %{user: %User{} = user}) do
911 case Filter.compose_regex(user) do
916 from([activity, object] in query,
918 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
919 activity.actor == ^user.ap_id
924 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
925 restrict_filtered(query, %{user: user})
928 defp restrict_filtered(query, _), do: query
930 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
932 defp exclude_poll_votes(query, _) do
933 if has_named_binding?(query, :object) do
934 from([activity, object: o] in query,
935 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
942 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
944 defp exclude_chat_messages(query, _) do
945 if has_named_binding?(query, :object) do
946 from([activity, object: o] in query,
947 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
954 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
956 defp exclude_invisible_actors(query, _opts) do
958 User.Query.build(%{invisible: true, select: [:ap_id]})
960 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
962 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
965 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
966 from(activity in query, where: activity.id != ^id)
969 defp exclude_id(query, _), do: query
971 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
973 defp maybe_preload_objects(query, _) do
975 |> Activity.with_preloaded_object()
978 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
980 defp maybe_preload_bookmarks(query, opts) do
982 |> Activity.with_preloaded_bookmark(opts[:user])
985 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
987 |> Activity.with_preloaded_report_notes()
990 defp maybe_preload_report_notes(query, _), do: query
992 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
994 defp maybe_set_thread_muted_field(query, opts) do
996 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
999 defp maybe_order(query, %{order: :desc}) do
1001 |> order_by(desc: :id)
1004 defp maybe_order(query, %{order: :asc}) do
1006 |> order_by(asc: :id)
1009 defp maybe_order(query, _), do: query
1011 defp fetch_activities_query_ap_ids_ops(opts) do
1012 source_user = opts[:muting_user]
1013 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1015 ap_id_relationships =
1016 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1017 [:block | ap_id_relationships]
1022 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1024 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1025 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1027 restrict_muted_reblogs_opts =
1028 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1030 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1033 def fetch_activities_query(recipients, opts \\ %{}) do
1034 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1035 fetch_activities_query_ap_ids_ops(opts)
1038 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1042 |> maybe_preload_objects(opts)
1043 |> maybe_preload_bookmarks(opts)
1044 |> maybe_preload_report_notes(opts)
1045 |> maybe_set_thread_muted_field(opts)
1046 |> maybe_order(opts)
1047 |> restrict_recipients(recipients, opts[:user])
1048 |> restrict_replies(opts)
1049 |> restrict_tag(opts)
1050 |> restrict_tag_reject(opts)
1051 |> restrict_tag_all(opts)
1052 |> restrict_since(opts)
1053 |> restrict_local(opts)
1054 |> restrict_actor(opts)
1055 |> restrict_type(opts)
1056 |> restrict_state(opts)
1057 |> restrict_favorited_by(opts)
1058 |> restrict_blocked(restrict_blocked_opts)
1059 |> restrict_muted(restrict_muted_opts)
1060 |> restrict_filtered(opts)
1061 |> restrict_media(opts)
1062 |> restrict_visibility(opts)
1063 |> restrict_thread_visibility(opts, config)
1064 |> restrict_reblogs(opts)
1065 |> restrict_pinned(opts)
1066 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1067 |> restrict_instance(opts)
1068 |> restrict_announce_object_actor(opts)
1069 |> restrict_filtered(opts)
1070 |> Activity.restrict_deactivated_users()
1071 |> exclude_poll_votes(opts)
1072 |> exclude_chat_messages(opts)
1073 |> exclude_invisible_actors(opts)
1074 |> exclude_visibility(opts)
1077 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1078 list_memberships = Pleroma.List.memberships(opts[:user])
1080 fetch_activities_query(recipients ++ list_memberships, opts)
1081 |> Pagination.fetch_paginated(opts, pagination)
1083 |> maybe_update_cc(list_memberships, opts[:user])
1087 Fetch favorites activities of user with order by sort adds to favorites
1089 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1090 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1092 |> Activity.Queries.by_actor()
1093 |> Activity.Queries.by_type("Like")
1094 |> Activity.with_joined_object()
1095 |> Object.with_joined_activity()
1096 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1097 |> order_by([like, _, _], desc_nulls_last: like.id)
1098 |> Pagination.fetch_paginated(
1099 Map.merge(params, %{skip_order: true}),
1104 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1105 Enum.map(activities, fn
1106 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1107 if Enum.any?(bcc, &(&1 in list_memberships)) do
1108 update_in(activity.data["cc"], &[user_ap_id | &1])
1118 defp maybe_update_cc(activities, _, _), do: activities
1120 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1121 from(activity in query,
1123 fragment("? && ?", activity.recipients, ^recipients) or
1124 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1125 ^Constants.as_public() in activity.recipients)
1129 def fetch_activities_bounded(
1131 recipients_with_public,
1133 pagination \\ :keyset
1135 fetch_activities_query([], opts)
1136 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1137 |> Pagination.fetch_paginated(opts, pagination)
1141 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1142 def upload(file, opts \\ []) do
1143 with {:ok, data} <- Upload.store(file, opts) do
1144 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1146 Repo.insert(%Object{data: obj_data})
1150 @spec get_actor_url(any()) :: binary() | nil
1151 defp get_actor_url(url) when is_binary(url), do: url
1152 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1154 defp get_actor_url(url) when is_list(url) do
1160 defp get_actor_url(_url), do: nil
1162 defp object_to_user_data(data) do
1164 data["icon"]["url"] &&
1167 "url" => [%{"href" => data["icon"]["url"]}]
1171 data["image"]["url"] &&
1174 "url" => [%{"href" => data["image"]["url"]}]
1179 |> Map.get("attachment", [])
1180 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1181 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1185 |> Map.get("tag", [])
1187 %{"type" => "Emoji"} -> true
1190 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1191 {String.trim(name, ":"), url}
1194 locked = data["manuallyApprovesFollowers"] || false
1195 capabilities = data["capabilities"] || %{}
1196 accepts_chat_messages = capabilities["acceptsChatMessages"]
1197 data = Transmogrifier.maybe_fix_user_object(data)
1198 discoverable = data["discoverable"] || false
1199 invisible = data["invisible"] || false
1200 actor_type = data["type"] || "Person"
1203 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1204 data["publicKey"]["publicKeyPem"]
1210 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1211 data["endpoints"]["sharedInbox"]
1218 uri: get_actor_url(data["url"]),
1224 discoverable: discoverable,
1225 invisible: invisible,
1228 follower_address: data["followers"],
1229 following_address: data["following"],
1230 bio: data["summary"] || "",
1231 actor_type: actor_type,
1232 also_known_as: Map.get(data, "alsoKnownAs", []),
1233 public_key: public_key,
1234 inbox: data["inbox"],
1235 shared_inbox: shared_inbox,
1236 accepts_chat_messages: accepts_chat_messages
1239 # nickname can be nil because of virtual actors
1240 if data["preferredUsername"] do
1244 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1247 Map.put(user_data, :nickname, nil)
1251 def fetch_follow_information_for_user(user) do
1252 with {:ok, following_data} <-
1253 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1254 {:ok, hide_follows} <- collection_private(following_data),
1255 {:ok, followers_data} <-
1256 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1257 {:ok, hide_followers} <- collection_private(followers_data) do
1260 hide_follows: hide_follows,
1261 follower_count: normalize_counter(followers_data["totalItems"]),
1262 following_count: normalize_counter(following_data["totalItems"]),
1263 hide_followers: hide_followers
1266 {:error, _} = e -> e
1271 defp normalize_counter(counter) when is_integer(counter), do: counter
1272 defp normalize_counter(_), do: 0
1274 def maybe_update_follow_information(user_data) do
1275 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1276 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1278 {:collections_available,
1279 !!(user_data[:following_address] && user_data[:follower_address])},
1281 fetch_follow_information_for_user(user_data) do
1282 info = Map.merge(user_data[:info] || %{}, info)
1285 |> Map.put(:info, info)
1287 {:user_type_check, false} ->
1290 {:collections_available, false} ->
1293 {:enabled, false} ->
1298 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1305 defp collection_private(%{"first" => %{"type" => type}})
1306 when type in ["CollectionPage", "OrderedCollectionPage"],
1309 defp collection_private(%{"first" => first}) do
1310 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1311 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1314 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1315 {:error, _} = e -> e
1320 defp collection_private(_data), do: {:ok, true}
1322 def user_data_from_user_object(data) do
1323 with {:ok, data} <- MRF.filter(data) do
1324 {:ok, object_to_user_data(data)}
1330 def fetch_and_prepare_user_from_ap_id(ap_id) do
1331 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1332 {:ok, data} <- user_data_from_user_object(data) do
1333 {:ok, maybe_update_follow_information(data)}
1335 {:error, "Object has been deleted" = e} ->
1336 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1339 {:error, {:reject, reason} = e} ->
1340 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1344 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1349 def maybe_handle_clashing_nickname(data) do
1350 with nickname when is_binary(nickname) <- data[:nickname],
1351 %User{} = old_user <- User.get_by_nickname(nickname),
1352 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1354 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1360 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1361 |> User.update_and_set_cache()
1363 {:ap_id_comparison, true} ->
1365 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1373 def make_user_from_ap_id(ap_id) do
1374 user = User.get_cached_by_ap_id(ap_id)
1376 if user && !User.ap_enabled?(user) do
1377 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1379 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1382 |> User.remote_user_changeset(data)
1383 |> User.update_and_set_cache()
1385 maybe_handle_clashing_nickname(data)
1388 |> User.remote_user_changeset()
1396 def make_user_from_nickname(nickname) do
1397 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1398 make_user_from_ap_id(ap_id)
1400 _e -> {:error, "No AP id in WebFinger"}
1404 # filter out broken threads
1405 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1406 entire_thread_visible_for_user?(activity, user)
1409 # do post-processing on a specific activity
1410 def contain_activity(%Activity{} = activity, %User{} = user) do
1411 contain_broken_threads(activity, user)
1414 def fetch_direct_messages_query do
1416 |> restrict_type(%{type: "Create"})
1417 |> restrict_visibility(%{visibility: "direct"})
1418 |> order_by([activity], asc: activity.id)