1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
126 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
130 %Activity{} = activity ->
136 {:containment, _} = error ->
139 {:error, _} = error ->
142 {:fake, true, map, recipients} ->
143 activity = %Activity{
147 recipients: recipients,
151 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
154 {:remote_limit_pass, _} ->
155 {:error, :remote_limit}
162 defp insert_activity_with_expiration(data, local, recipients) do
166 actor: data["actor"],
167 recipients: recipients
170 with {:ok, activity} <- Repo.insert(struct) do
171 maybe_create_activity_expiration(activity)
175 def notify_and_stream(activity) do
176 Notification.create_notifications(activity)
178 conversation = create_or_bump_conversation(activity, activity.actor)
179 participations = get_participations(conversation)
181 stream_out_participations(participations)
184 defp maybe_create_activity_expiration(
185 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
188 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
189 activity_id: activity.id,
190 expires_at: expires_at
196 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
198 defp create_or_bump_conversation(activity, actor) do
199 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
200 %User{} = user <- User.get_cached_by_ap_id(actor) do
201 Participation.mark_as_read(user, conversation)
206 defp get_participations({:ok, conversation}) do
208 |> Repo.preload(:participations, force: true)
209 |> Map.get(:participations)
212 defp get_participations(_), do: []
214 def stream_out_participations(participations) do
217 |> Repo.preload(:user)
219 Streamer.stream("participation", participations)
222 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
223 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
224 conversation = Repo.preload(conversation, :participations)
227 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
232 if last_activity_id do
233 stream_out_participations(conversation.participations)
238 def stream_out_participations(_, _), do: :noop
240 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
241 when data_type in ["Create", "Announce", "Delete"] do
243 |> Topics.get_activity_topics()
244 |> Streamer.stream(activity)
247 def stream_out(_activity) do
251 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
252 def create(params, fake \\ false) do
253 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
258 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
259 additional = params[:additional] || %{}
260 # only accept false as false value
261 local = !(params[:local] == false)
262 published = params[:published]
263 quick_insert? = Config.get([:env]) == :benchmark
267 %{to: to, actor: actor, published: published, context: context, object: object},
271 with {:ok, activity} <- insert(create_data, local, fake),
272 {:fake, false, activity} <- {:fake, fake, activity},
273 _ <- increase_replies_count_if_reply(create_data),
274 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
275 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
276 _ <- notify_and_stream(activity),
277 :ok <- maybe_federate(activity) do
280 {:quick_insert, true, activity} ->
283 {:fake, true, activity} ->
287 Repo.rollback(message)
291 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
292 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
293 additional = params[:additional] || %{}
294 # only accept false as false value
295 local = !(params[:local] == false)
296 published = params[:published]
300 %{to: to, actor: actor, published: published, context: context, object: object},
304 with {:ok, activity} <- insert(listen_data, local),
305 _ <- notify_and_stream(activity),
306 :ok <- maybe_federate(activity) do
311 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
312 {:ok, Activity.t()} | nil | {:error, any()}
313 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
314 with {:ok, result} <-
315 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
320 defp do_unfollow(follower, followed, activity_id, local) do
321 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
322 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
323 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
324 {:ok, activity} <- insert(unfollow_data, local),
325 _ <- notify_and_stream(activity),
326 :ok <- maybe_federate(activity) do
330 {:error, error} -> Repo.rollback(error)
334 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
344 # only accept false as false value
345 local = !(params[:local] == false)
346 forward = !(params[:forward] == false)
348 additional = params[:additional] || %{}
352 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
354 Map.merge(additional, %{"to" => [], "cc" => []})
357 with flag_data <- make_flag_data(params, additional),
358 {:ok, activity} <- insert(flag_data, local),
359 {:ok, stripped_activity} <- strip_report_status_data(activity),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(stripped_activity) do
362 User.all_superusers()
363 |> Enum.filter(fn user -> not is_nil(user.email) end)
364 |> Enum.each(fn superuser ->
366 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
367 |> Pleroma.Emails.Mailer.deliver_async()
374 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
375 def move(%User{} = origin, %User{} = target, local \\ true) do
378 "actor" => origin.ap_id,
379 "object" => origin.ap_id,
380 "target" => target.ap_id
383 with true <- origin.ap_id in target.also_known_as,
384 {:ok, activity} <- insert(params, local),
385 _ <- notify_and_stream(activity) do
386 maybe_federate(activity)
388 BackgroundWorker.enqueue("move_following", %{
389 "origin_id" => origin.id,
390 "target_id" => target.id
395 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
400 def fetch_activities_for_context_query(context, opts) do
401 public = [Constants.as_public()]
405 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
408 from(activity in Activity)
409 |> maybe_preload_objects(opts)
410 |> maybe_preload_bookmarks(opts)
411 |> maybe_set_thread_muted_field(opts)
412 |> restrict_blocked(opts)
413 |> restrict_recipients(recipients, opts[:user])
414 |> restrict_filtered(opts)
418 "?->>'type' = ? and ?->>'context' = ?",
425 |> exclude_poll_votes(opts)
427 |> order_by([activity], desc: activity.id)
430 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
431 def fetch_activities_for_context(context, opts \\ %{}) do
433 |> fetch_activities_for_context_query(opts)
437 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
438 FlakeId.Ecto.CompatType.t() | nil
439 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
441 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
442 |> restrict_visibility(%{visibility: "direct"})
448 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
449 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
450 opts = Map.delete(opts, :user)
452 [Constants.as_public()]
453 |> fetch_activities_query(opts)
454 |> restrict_unlisted(opts)
455 |> Pagination.fetch_paginated(opts, pagination)
458 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
459 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
461 |> Map.put(:restrict_unlisted, true)
462 |> fetch_public_or_unlisted_activities(pagination)
465 @valid_visibilities ~w[direct unlisted public private]
467 defp restrict_visibility(query, %{visibility: visibility})
468 when is_list(visibility) do
469 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
474 "activity_visibility(?, ?, ?) = ANY (?)",
482 Logger.error("Could not restrict visibility to #{visibility}")
486 defp restrict_visibility(query, %{visibility: visibility})
487 when visibility in @valid_visibilities do
491 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
495 defp restrict_visibility(_query, %{visibility: visibility})
496 when visibility not in @valid_visibilities do
497 Logger.error("Could not restrict visibility to #{visibility}")
500 defp restrict_visibility(query, _visibility), do: query
502 defp exclude_visibility(query, %{exclude_visibilities: visibility})
503 when is_list(visibility) do
504 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
509 "activity_visibility(?, ?, ?) = ANY (?)",
517 Logger.error("Could not exclude visibility to #{visibility}")
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when visibility in @valid_visibilities do
528 "activity_visibility(?, ?, ?) = ?",
537 defp exclude_visibility(query, %{exclude_visibilities: visibility})
538 when visibility not in [nil | @valid_visibilities] do
539 Logger.error("Could not exclude visibility to #{visibility}")
543 defp exclude_visibility(query, _visibility), do: query
545 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
548 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
551 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
554 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
558 defp restrict_thread_visibility(query, _, _), do: query
560 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
563 |> Map.put(:user, reading_user)
564 |> Map.put(:actor_id, user.ap_id)
567 godmode: params[:godmode],
568 reading_user: reading_user
570 |> user_activities_recipients()
571 |> fetch_activities(params)
575 def fetch_user_activities(user, reading_user, params \\ %{}) do
578 |> Map.put(:type, ["Create", "Announce"])
579 |> Map.put(:user, reading_user)
580 |> Map.put(:actor_id, user.ap_id)
581 |> Map.put(:pinned_activity_ids, user.pinned_activities)
584 if User.blocks?(reading_user, user) do
588 |> Map.put(:blocking_user, reading_user)
589 |> Map.put(:muting_user, reading_user)
593 godmode: params[:godmode],
594 reading_user: reading_user
596 |> user_activities_recipients()
597 |> fetch_activities(params)
601 def fetch_statuses(reading_user, params) do
602 params = Map.put(params, :type, ["Create", "Announce"])
605 godmode: params[:godmode],
606 reading_user: reading_user
608 |> user_activities_recipients()
609 |> fetch_activities(params, :offset)
613 defp user_activities_recipients(%{godmode: true}), do: []
615 defp user_activities_recipients(%{reading_user: reading_user}) do
617 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
619 [Constants.as_public()]
623 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
624 raise "Can't use the child object without preloading!"
627 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
629 [activity, object] in query,
632 "?->>'type' != ? or ?->>'actor' != ?",
641 defp restrict_announce_object_actor(query, _), do: query
643 defp restrict_since(query, %{since_id: ""}), do: query
645 defp restrict_since(query, %{since_id: since_id}) do
646 from(activity in query, where: activity.id > ^since_id)
649 defp restrict_since(query, _), do: query
651 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
652 raise "Can't use the child object without preloading!"
655 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
657 [_activity, object] in query,
658 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
662 defp restrict_tag_reject(query, _), do: query
664 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
665 raise "Can't use the child object without preloading!"
668 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
670 [_activity, object] in query,
671 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
675 defp restrict_tag_all(query, _), do: query
677 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
678 raise "Can't use the child object without preloading!"
681 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
683 [_activity, object] in query,
684 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
688 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
690 [_activity, object] in query,
691 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
695 defp restrict_tag(query, _), do: query
697 defp restrict_recipients(query, [], _user), do: query
699 defp restrict_recipients(query, recipients, nil) do
700 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
703 defp restrict_recipients(query, recipients, user) do
706 where: fragment("? && ?", ^recipients, activity.recipients),
707 or_where: activity.actor == ^user.ap_id
711 defp restrict_local(query, %{local_only: true}) do
712 from(activity in query, where: activity.local == true)
715 defp restrict_local(query, _), do: query
717 defp restrict_actor(query, %{actor_id: actor_id}) do
718 from(activity in query, where: activity.actor == ^actor_id)
721 defp restrict_actor(query, _), do: query
723 defp restrict_type(query, %{type: type}) when is_binary(type) do
724 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
727 defp restrict_type(query, %{type: type}) do
728 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
731 defp restrict_type(query, _), do: query
733 defp restrict_state(query, %{state: state}) do
734 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
737 defp restrict_state(query, _), do: query
739 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
741 [_activity, object] in query,
742 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
746 defp restrict_favorited_by(query, _), do: query
748 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
749 raise "Can't use the child object without preloading!"
752 defp restrict_media(query, %{only_media: true}) do
754 [activity, object] in query,
755 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
756 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
760 defp restrict_media(query, _), do: query
762 defp restrict_replies(query, %{exclude_replies: true}) do
764 [_activity, object] in query,
765 where: fragment("?->>'inReplyTo' is null", object.data)
769 defp restrict_replies(query, %{
770 reply_filtering_user: %User{} = user,
771 reply_visibility: "self"
774 [activity, object] in query,
777 "?->>'inReplyTo' is null OR ? = ANY(?)",
785 defp restrict_replies(query, %{
786 reply_filtering_user: %User{} = user,
787 reply_visibility: "following"
790 [activity, object] in query,
794 ?->>'type' != 'Create' -- This isn't a Create
795 OR ?->>'inReplyTo' is null -- this isn't a reply
796 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
797 -- unless they are the author (because authors
798 -- are also part of the recipients). This leads
799 -- to a bug that self-replies by friends won't
801 OR ? = ? -- The actor is us
805 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
814 defp restrict_replies(query, _), do: query
816 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
817 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
820 defp restrict_reblogs(query, _), do: query
822 defp restrict_muted(query, %{with_muted: true}), do: query
824 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
825 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
828 from([activity] in query,
829 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
832 "not (?->'to' \\?| ?) or ? = ?",
840 unless opts[:skip_preload] do
841 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
847 defp restrict_muted(query, _), do: query
849 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
850 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
851 domain_blocks = user.domain_blocks || []
853 following_ap_ids = User.get_friends_ap_ids(user)
856 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
859 [activity, object: o] in query,
860 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
863 "((not (? && ?)) or ? = ?)",
871 "recipients_contain_blocked_domains(?, ?) = false",
877 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
884 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
892 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
901 defp restrict_blocked(query, _), do: query
903 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
908 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
910 ^[Constants.as_public()]
915 defp restrict_unlisted(query, _), do: query
917 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
918 from(activity in query, where: activity.id in ^ids)
921 defp restrict_pinned(query, _), do: query
923 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
924 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
930 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
938 defp restrict_muted_reblogs(query, _), do: query
940 defp restrict_instance(query, %{instance: instance}) do
945 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
949 from(activity in query, where: activity.actor in ^users)
952 defp restrict_instance(query, _), do: query
954 defp restrict_filtered(query, %{user: %User{} = user}) do
955 case Filter.compose_regex(user) do
960 from([activity, object] in query,
962 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
963 activity.actor == ^user.ap_id
968 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
969 restrict_filtered(query, %{user: user})
972 defp restrict_filtered(query, _), do: query
974 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
976 defp exclude_poll_votes(query, _) do
977 if has_named_binding?(query, :object) do
978 from([activity, object: o] in query,
979 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
986 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
988 defp exclude_chat_messages(query, _) do
989 if has_named_binding?(query, :object) do
990 from([activity, object: o] in query,
991 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
998 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1000 defp exclude_invisible_actors(query, _opts) do
1002 User.Query.build(%{invisible: true, select: [:ap_id]})
1004 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1006 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1009 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1010 from(activity in query, where: activity.id != ^id)
1013 defp exclude_id(query, _), do: query
1015 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1017 defp maybe_preload_objects(query, _) do
1019 |> Activity.with_preloaded_object()
1022 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1024 defp maybe_preload_bookmarks(query, opts) do
1026 |> Activity.with_preloaded_bookmark(opts[:user])
1029 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1031 |> Activity.with_preloaded_report_notes()
1034 defp maybe_preload_report_notes(query, _), do: query
1036 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1038 defp maybe_set_thread_muted_field(query, opts) do
1040 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1043 defp maybe_order(query, %{order: :desc}) do
1045 |> order_by(desc: :id)
1048 defp maybe_order(query, %{order: :asc}) do
1050 |> order_by(asc: :id)
1053 defp maybe_order(query, _), do: query
1055 defp fetch_activities_query_ap_ids_ops(opts) do
1056 source_user = opts[:muting_user]
1057 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1059 ap_id_relationships =
1060 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1061 [:block | ap_id_relationships]
1066 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1068 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1069 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1071 restrict_muted_reblogs_opts =
1072 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1074 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1077 def fetch_activities_query(recipients, opts \\ %{}) do
1078 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1079 fetch_activities_query_ap_ids_ops(opts)
1082 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1086 |> maybe_preload_objects(opts)
1087 |> maybe_preload_bookmarks(opts)
1088 |> maybe_preload_report_notes(opts)
1089 |> maybe_set_thread_muted_field(opts)
1090 |> maybe_order(opts)
1091 |> restrict_recipients(recipients, opts[:user])
1092 |> restrict_replies(opts)
1093 |> restrict_tag(opts)
1094 |> restrict_tag_reject(opts)
1095 |> restrict_tag_all(opts)
1096 |> restrict_since(opts)
1097 |> restrict_local(opts)
1098 |> restrict_actor(opts)
1099 |> restrict_type(opts)
1100 |> restrict_state(opts)
1101 |> restrict_favorited_by(opts)
1102 |> restrict_blocked(restrict_blocked_opts)
1103 |> restrict_muted(restrict_muted_opts)
1104 |> restrict_filtered(opts)
1105 |> restrict_media(opts)
1106 |> restrict_visibility(opts)
1107 |> restrict_thread_visibility(opts, config)
1108 |> restrict_reblogs(opts)
1109 |> restrict_pinned(opts)
1110 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1111 |> restrict_instance(opts)
1112 |> restrict_announce_object_actor(opts)
1113 |> restrict_filtered(opts)
1114 |> Activity.restrict_deactivated_users()
1115 |> exclude_poll_votes(opts)
1116 |> exclude_chat_messages(opts)
1117 |> exclude_invisible_actors(opts)
1118 |> exclude_visibility(opts)
1121 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1122 list_memberships = Pleroma.List.memberships(opts[:user])
1124 fetch_activities_query(recipients ++ list_memberships, opts)
1125 |> Pagination.fetch_paginated(opts, pagination)
1127 |> maybe_update_cc(list_memberships, opts[:user])
1131 Fetch favorites activities of user with order by sort adds to favorites
1133 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1134 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1136 |> Activity.Queries.by_actor()
1137 |> Activity.Queries.by_type("Like")
1138 |> Activity.with_joined_object()
1139 |> Object.with_joined_activity()
1140 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1141 |> order_by([like, _, _], desc_nulls_last: like.id)
1142 |> Pagination.fetch_paginated(
1143 Map.merge(params, %{skip_order: true}),
1148 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1149 Enum.map(activities, fn
1150 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1151 if Enum.any?(bcc, &(&1 in list_memberships)) do
1152 update_in(activity.data["cc"], &[user_ap_id | &1])
1162 defp maybe_update_cc(activities, _, _), do: activities
1164 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1165 from(activity in query,
1167 fragment("? && ?", activity.recipients, ^recipients) or
1168 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1169 ^Constants.as_public() in activity.recipients)
1173 def fetch_activities_bounded(
1175 recipients_with_public,
1177 pagination \\ :keyset
1179 fetch_activities_query([], opts)
1180 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1181 |> Pagination.fetch_paginated(opts, pagination)
1185 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1186 def upload(file, opts \\ []) do
1187 with {:ok, data} <- Upload.store(file, opts) do
1188 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1190 Repo.insert(%Object{data: obj_data})
1194 @spec get_actor_url(any()) :: binary() | nil
1195 defp get_actor_url(url) when is_binary(url), do: url
1196 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1198 defp get_actor_url(url) when is_list(url) do
1204 defp get_actor_url(_url), do: nil
1206 defp object_to_user_data(data) do
1208 data["icon"]["url"] &&
1211 "url" => [%{"href" => data["icon"]["url"]}]
1215 data["image"]["url"] &&
1218 "url" => [%{"href" => data["image"]["url"]}]
1223 |> Map.get("attachment", [])
1224 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1225 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1229 |> Map.get("tag", [])
1231 %{"type" => "Emoji"} -> true
1234 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1235 {String.trim(name, ":"), url}
1238 is_locked = data["manuallyApprovesFollowers"] || false
1239 capabilities = data["capabilities"] || %{}
1240 accepts_chat_messages = capabilities["acceptsChatMessages"]
1241 data = Transmogrifier.maybe_fix_user_object(data)
1242 is_discoverable = data["discoverable"] || false
1243 invisible = data["invisible"] || false
1244 actor_type = data["type"] || "Person"
1247 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1248 data["publicKey"]["publicKeyPem"]
1254 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1255 data["endpoints"]["sharedInbox"]
1262 uri: get_actor_url(data["url"]),
1267 is_locked: is_locked,
1268 is_discoverable: is_discoverable,
1269 invisible: invisible,
1272 follower_address: data["followers"],
1273 following_address: data["following"],
1274 bio: data["summary"] || "",
1275 actor_type: actor_type,
1276 also_known_as: Map.get(data, "alsoKnownAs", []),
1277 public_key: public_key,
1278 inbox: data["inbox"],
1279 shared_inbox: shared_inbox,
1280 accepts_chat_messages: accepts_chat_messages
1283 # nickname can be nil because of virtual actors
1284 if data["preferredUsername"] do
1288 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1291 Map.put(user_data, :nickname, nil)
1295 def fetch_follow_information_for_user(user) do
1296 with {:ok, following_data} <-
1297 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
1300 {:ok, hide_follows} <- collection_private(following_data),
1301 {:ok, followers_data} <-
1302 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
1303 {:ok, hide_followers} <- collection_private(followers_data) do
1306 hide_follows: hide_follows,
1307 follower_count: normalize_counter(followers_data["totalItems"]),
1308 following_count: normalize_counter(following_data["totalItems"]),
1309 hide_followers: hide_followers
1312 {:error, _} = e -> e
1317 defp normalize_counter(counter) when is_integer(counter), do: counter
1318 defp normalize_counter(_), do: 0
1320 def maybe_update_follow_information(user_data) do
1321 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1322 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1324 {:collections_available,
1325 !!(user_data[:following_address] && user_data[:follower_address])},
1327 fetch_follow_information_for_user(user_data) do
1328 info = Map.merge(user_data[:info] || %{}, info)
1331 |> Map.put(:info, info)
1333 {:user_type_check, false} ->
1336 {:collections_available, false} ->
1339 {:enabled, false} ->
1344 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1351 defp collection_private(%{"first" => %{"type" => type}})
1352 when type in ["CollectionPage", "OrderedCollectionPage"],
1355 defp collection_private(%{"first" => first}) do
1356 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1357 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1360 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1361 {:error, _} = e -> e
1366 defp collection_private(_data), do: {:ok, true}
1368 def user_data_from_user_object(data) do
1369 with {:ok, data} <- MRF.filter(data) do
1370 {:ok, object_to_user_data(data)}
1376 def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
1377 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
1378 {:ok, data} <- user_data_from_user_object(data) do
1379 {:ok, maybe_update_follow_information(data)}
1381 # If this has been deleted, only log a debug and not an error
1382 {:error, "Object has been deleted" = e} ->
1383 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1386 {:error, {:reject, reason} = e} ->
1387 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1391 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1396 def maybe_handle_clashing_nickname(data) do
1397 with nickname when is_binary(nickname) <- data[:nickname],
1398 %User{} = old_user <- User.get_by_nickname(nickname),
1399 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1401 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1407 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1408 |> User.update_and_set_cache()
1410 {:ap_id_comparison, true} ->
1412 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1420 def make_user_from_ap_id(ap_id, opts \\ []) do
1421 user = User.get_cached_by_ap_id(ap_id)
1423 if user && !User.ap_enabled?(user) do
1424 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1426 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
1429 |> User.remote_user_changeset(data)
1430 |> User.update_and_set_cache()
1432 maybe_handle_clashing_nickname(data)
1435 |> User.remote_user_changeset()
1443 def make_user_from_nickname(nickname) do
1444 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1445 make_user_from_ap_id(ap_id)
1447 _e -> {:error, "No AP id in WebFinger"}
1451 # filter out broken threads
1452 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1453 entire_thread_visible_for_user?(activity, user)
1456 # do post-processing on a specific activity
1457 def contain_activity(%Activity{} = activity, %User{} = user) do
1458 contain_broken_threads(activity, user)
1461 def fetch_direct_messages_query do
1463 |> restrict_type(%{type: "Create"})
1464 |> restrict_visibility(%{visibility: "direct"})
1465 |> order_by([activity], asc: activity.id)