1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
126 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
130 %Activity{} = activity ->
136 {:containment, _} = error ->
139 {:error, _} = error ->
142 {:fake, true, map, recipients} ->
143 activity = %Activity{
147 recipients: recipients,
151 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
154 {:remote_limit_pass, _} ->
155 {:error, :remote_limit}
162 defp insert_activity_with_expiration(data, local, recipients) do
166 actor: data["actor"],
167 recipients: recipients
170 with {:ok, activity} <- Repo.insert(struct) do
171 maybe_create_activity_expiration(activity)
175 def notify_and_stream(activity) do
176 Notification.create_notifications(activity)
178 conversation = create_or_bump_conversation(activity, activity.actor)
179 participations = get_participations(conversation)
181 stream_out_participations(participations)
184 defp maybe_create_activity_expiration(
185 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
188 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
189 activity_id: activity.id,
190 expires_at: expires_at
196 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
198 defp create_or_bump_conversation(activity, actor) do
199 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
200 %User{} = user <- User.get_cached_by_ap_id(actor) do
201 Participation.mark_as_read(user, conversation)
206 defp get_participations({:ok, conversation}) do
208 |> Repo.preload(:participations, force: true)
209 |> Map.get(:participations)
212 defp get_participations(_), do: []
214 def stream_out_participations(participations) do
217 |> Repo.preload(:user)
219 Streamer.stream("participation", participations)
222 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
223 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
224 conversation = Repo.preload(conversation, :participations)
227 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
232 if last_activity_id do
233 stream_out_participations(conversation.participations)
238 def stream_out_participations(_, _), do: :noop
240 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
241 when data_type in ["Create", "Announce", "Delete"] do
243 |> Topics.get_activity_topics()
244 |> Streamer.stream(activity)
247 def stream_out(_activity) do
251 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
252 def create(params, fake \\ false) do
253 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
258 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
259 additional = params[:additional] || %{}
260 # only accept false as false value
261 local = !(params[:local] == false)
262 published = params[:published]
263 quick_insert? = Config.get([:env]) == :benchmark
267 %{to: to, actor: actor, published: published, context: context, object: object},
271 with {:ok, activity} <- insert(create_data, local, fake),
272 {:fake, false, activity} <- {:fake, fake, activity},
273 _ <- increase_replies_count_if_reply(create_data),
274 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
275 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
276 _ <- notify_and_stream(activity),
277 :ok <- maybe_federate(activity) do
280 {:quick_insert, true, activity} ->
283 {:fake, true, activity} ->
287 Repo.rollback(message)
291 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
292 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
293 additional = params[:additional] || %{}
294 # only accept false as false value
295 local = !(params[:local] == false)
296 published = params[:published]
300 %{to: to, actor: actor, published: published, context: context, object: object},
304 with {:ok, activity} <- insert(listen_data, local),
305 _ <- notify_and_stream(activity),
306 :ok <- maybe_federate(activity) do
311 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
312 {:ok, Activity.t()} | nil | {:error, any()}
313 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
314 with {:ok, result} <-
315 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
320 defp do_unfollow(follower, followed, activity_id, local) do
321 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
322 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
323 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
324 {:ok, activity} <- insert(unfollow_data, local),
325 _ <- notify_and_stream(activity),
326 :ok <- maybe_federate(activity) do
330 {:error, error} -> Repo.rollback(error)
334 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
344 # only accept false as false value
345 local = !(params[:local] == false)
346 forward = !(params[:forward] == false)
348 additional = params[:additional] || %{}
352 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
354 Map.merge(additional, %{"to" => [], "cc" => []})
357 with flag_data <- make_flag_data(params, additional),
358 {:ok, activity} <- insert(flag_data, local),
359 {:ok, stripped_activity} <- strip_report_status_data(activity),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(stripped_activity) do
362 User.all_superusers()
363 |> Enum.filter(fn user -> not is_nil(user.email) end)
364 |> Enum.each(fn superuser ->
366 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
367 |> Pleroma.Emails.Mailer.deliver_async()
374 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
375 def move(%User{} = origin, %User{} = target, local \\ true) do
378 "actor" => origin.ap_id,
379 "object" => origin.ap_id,
380 "target" => target.ap_id
383 with true <- origin.ap_id in target.also_known_as,
384 {:ok, activity} <- insert(params, local),
385 _ <- notify_and_stream(activity) do
386 maybe_federate(activity)
388 BackgroundWorker.enqueue("move_following", %{
389 "origin_id" => origin.id,
390 "target_id" => target.id
395 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
400 def fetch_activities_for_context_query(context, opts) do
401 public = [Constants.as_public()]
405 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
408 from(activity in Activity)
409 |> maybe_preload_objects(opts)
410 |> maybe_preload_bookmarks(opts)
411 |> maybe_set_thread_muted_field(opts)
412 |> restrict_blocked(opts)
413 |> restrict_blockers_visibility(opts)
414 |> restrict_recipients(recipients, opts[:user])
415 |> restrict_filtered(opts)
419 "?->>'type' = ? and ?->>'context' = ?",
426 |> exclude_poll_votes(opts)
428 |> order_by([activity], desc: activity.id)
431 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
432 def fetch_activities_for_context(context, opts \\ %{}) do
434 |> fetch_activities_for_context_query(opts)
438 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
439 FlakeId.Ecto.CompatType.t() | nil
440 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
442 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
443 |> restrict_visibility(%{visibility: "direct"})
449 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
450 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
451 opts = Map.delete(opts, :user)
453 [Constants.as_public()]
454 |> fetch_activities_query(opts)
455 |> restrict_unlisted(opts)
456 |> Pagination.fetch_paginated(opts, pagination)
459 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
460 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
462 |> Map.put(:restrict_unlisted, true)
463 |> fetch_public_or_unlisted_activities(pagination)
466 @valid_visibilities ~w[direct unlisted public private]
468 defp restrict_visibility(query, %{visibility: visibility})
469 when is_list(visibility) do
470 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
475 "activity_visibility(?, ?, ?) = ANY (?)",
483 Logger.error("Could not restrict visibility to #{visibility}")
487 defp restrict_visibility(query, %{visibility: visibility})
488 when visibility in @valid_visibilities do
492 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
496 defp restrict_visibility(_query, %{visibility: visibility})
497 when visibility not in @valid_visibilities do
498 Logger.error("Could not restrict visibility to #{visibility}")
501 defp restrict_visibility(query, _visibility), do: query
503 defp exclude_visibility(query, %{exclude_visibilities: visibility})
504 when is_list(visibility) do
505 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
510 "activity_visibility(?, ?, ?) = ANY (?)",
518 Logger.error("Could not exclude visibility to #{visibility}")
523 defp exclude_visibility(query, %{exclude_visibilities: visibility})
524 when visibility in @valid_visibilities do
529 "activity_visibility(?, ?, ?) = ?",
538 defp exclude_visibility(query, %{exclude_visibilities: visibility})
539 when visibility not in [nil | @valid_visibilities] do
540 Logger.error("Could not exclude visibility to #{visibility}")
544 defp exclude_visibility(query, _visibility), do: query
546 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
549 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
552 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
555 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
559 defp restrict_thread_visibility(query, _, _), do: query
561 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
564 |> Map.put(:user, reading_user)
565 |> Map.put(:actor_id, user.ap_id)
568 godmode: params[:godmode],
569 reading_user: reading_user
571 |> user_activities_recipients()
572 |> fetch_activities(params)
576 def fetch_user_activities(user, reading_user, params \\ %{}) do
579 |> Map.put(:type, ["Create", "Announce"])
580 |> Map.put(:user, reading_user)
581 |> Map.put(:actor_id, user.ap_id)
582 |> Map.put(:pinned_activity_ids, user.pinned_activities)
585 if User.blocks?(reading_user, user) do
589 |> Map.put(:blocking_user, reading_user)
590 |> Map.put(:muting_user, reading_user)
594 godmode: params[:godmode],
595 reading_user: reading_user
597 |> user_activities_recipients()
598 |> fetch_activities(params)
602 def fetch_statuses(reading_user, params) do
603 params = Map.put(params, :type, ["Create", "Announce"])
606 godmode: params[:godmode],
607 reading_user: reading_user
609 |> user_activities_recipients()
610 |> fetch_activities(params, :offset)
614 defp user_activities_recipients(%{godmode: true}), do: []
616 defp user_activities_recipients(%{reading_user: reading_user}) do
618 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
620 [Constants.as_public()]
624 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
625 raise "Can't use the child object without preloading!"
628 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
630 [activity, object] in query,
633 "?->>'type' != ? or ?->>'actor' != ?",
642 defp restrict_announce_object_actor(query, _), do: query
644 defp restrict_since(query, %{since_id: ""}), do: query
646 defp restrict_since(query, %{since_id: since_id}) do
647 from(activity in query, where: activity.id > ^since_id)
650 defp restrict_since(query, _), do: query
652 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
653 raise "Can't use the child object without preloading!"
656 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
658 [_activity, object] in query,
659 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
663 defp restrict_tag_reject(query, _), do: query
665 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
666 raise "Can't use the child object without preloading!"
669 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
671 [_activity, object] in query,
672 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
676 defp restrict_tag_all(query, _), do: query
678 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
679 raise "Can't use the child object without preloading!"
682 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
689 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
691 [_activity, object] in query,
692 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
696 defp restrict_tag(query, _), do: query
698 defp restrict_recipients(query, [], _user), do: query
700 defp restrict_recipients(query, recipients, nil) do
701 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
704 defp restrict_recipients(query, recipients, user) do
707 where: fragment("? && ?", ^recipients, activity.recipients),
708 or_where: activity.actor == ^user.ap_id
712 defp restrict_local(query, %{local_only: true}) do
713 from(activity in query, where: activity.local == true)
716 defp restrict_local(query, _), do: query
718 defp restrict_actor(query, %{actor_id: actor_id}) do
719 from(activity in query, where: activity.actor == ^actor_id)
722 defp restrict_actor(query, _), do: query
724 defp restrict_type(query, %{type: type}) when is_binary(type) do
725 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
728 defp restrict_type(query, %{type: type}) do
729 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
732 defp restrict_type(query, _), do: query
734 defp restrict_state(query, %{state: state}) do
735 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
738 defp restrict_state(query, _), do: query
740 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
742 [_activity, object] in query,
743 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
747 defp restrict_favorited_by(query, _), do: query
749 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
750 raise "Can't use the child object without preloading!"
753 defp restrict_media(query, %{only_media: true}) do
755 [activity, object] in query,
756 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
757 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
761 defp restrict_media(query, _), do: query
763 defp restrict_replies(query, %{exclude_replies: true}) do
765 [_activity, object] in query,
766 where: fragment("?->>'inReplyTo' is null", object.data)
770 defp restrict_replies(query, %{
771 reply_filtering_user: %User{} = user,
772 reply_visibility: "self"
775 [activity, object] in query,
778 "?->>'inReplyTo' is null OR ? = ANY(?)",
786 defp restrict_replies(query, %{
787 reply_filtering_user: %User{} = user,
788 reply_visibility: "following"
791 [activity, object] in query,
795 ?->>'type' != 'Create' -- This isn't a Create
796 OR ?->>'inReplyTo' is null -- this isn't a reply
797 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
798 -- unless they are the author (because authors
799 -- are also part of the recipients). This leads
800 -- to a bug that self-replies by friends won't
802 OR ? = ? -- The actor is us
806 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
815 defp restrict_replies(query, _), do: query
817 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
818 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
821 defp restrict_reblogs(query, _), do: query
823 defp restrict_muted(query, %{with_muted: true}), do: query
825 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
826 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
829 from([activity] in query,
830 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
833 "not (?->'to' \\?| ?) or ? = ?",
841 unless opts[:skip_preload] do
842 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
848 defp restrict_muted(query, _), do: query
850 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
851 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
852 domain_blocks = user.domain_blocks || []
854 following_ap_ids = User.get_friends_ap_ids(user)
857 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
860 [activity, object: o] in query,
861 # You don't block the author
862 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
864 # You don't block any recipients, and didn't author the post
867 "((not (? && ?)) or ? = ?)",
874 # You don't block the domain of any recipients, and didn't author the post
877 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
884 # It's not a boost of a user you block
887 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
893 # You don't block the author's domain, and also don't follow the author
896 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
903 # Same as above, but checks the Object
906 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
915 defp restrict_blocked(query, _), do: query
917 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
918 if Config.get([:activitypub, :blockers_visible]) == true do
921 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
925 # The author doesn't block you
926 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
928 # It's not a boost of a user that blocks you
931 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
940 defp restrict_blockers_visibility(query, _), do: query
942 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
947 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
949 ^[Constants.as_public()]
954 defp restrict_unlisted(query, _), do: query
956 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
957 from(activity in query, where: activity.id in ^ids)
960 defp restrict_pinned(query, _), do: query
962 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
963 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
969 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
977 defp restrict_muted_reblogs(query, _), do: query
979 defp restrict_instance(query, %{instance: instance}) do
984 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
988 from(activity in query, where: activity.actor in ^users)
991 defp restrict_instance(query, _), do: query
993 defp restrict_filtered(query, %{user: %User{} = user}) do
994 case Filter.compose_regex(user) do
999 from([activity, object] in query,
1001 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1002 activity.actor == ^user.ap_id
1007 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1008 restrict_filtered(query, %{user: user})
1011 defp restrict_filtered(query, _), do: query
1013 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1015 defp exclude_poll_votes(query, _) do
1016 if has_named_binding?(query, :object) do
1017 from([activity, object: o] in query,
1018 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1025 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1027 defp exclude_chat_messages(query, _) do
1028 if has_named_binding?(query, :object) do
1029 from([activity, object: o] in query,
1030 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1037 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1039 defp exclude_invisible_actors(query, _opts) do
1041 User.Query.build(%{invisible: true, select: [:ap_id]})
1043 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1045 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1048 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1049 from(activity in query, where: activity.id != ^id)
1052 defp exclude_id(query, _), do: query
1054 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1056 defp maybe_preload_objects(query, _) do
1058 |> Activity.with_preloaded_object()
1061 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1063 defp maybe_preload_bookmarks(query, opts) do
1065 |> Activity.with_preloaded_bookmark(opts[:user])
1068 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1070 |> Activity.with_preloaded_report_notes()
1073 defp maybe_preload_report_notes(query, _), do: query
1075 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1077 defp maybe_set_thread_muted_field(query, opts) do
1079 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1082 defp maybe_order(query, %{order: :desc}) do
1084 |> order_by(desc: :id)
1087 defp maybe_order(query, %{order: :asc}) do
1089 |> order_by(asc: :id)
1092 defp maybe_order(query, _), do: query
1094 defp fetch_activities_query_ap_ids_ops(opts) do
1095 source_user = opts[:muting_user]
1096 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1098 ap_id_relationships =
1099 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1100 [:block | ap_id_relationships]
1105 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1107 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1108 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1110 restrict_muted_reblogs_opts =
1111 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1113 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1116 def fetch_activities_query(recipients, opts \\ %{}) do
1117 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1118 fetch_activities_query_ap_ids_ops(opts)
1121 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1125 |> maybe_preload_objects(opts)
1126 |> maybe_preload_bookmarks(opts)
1127 |> maybe_preload_report_notes(opts)
1128 |> maybe_set_thread_muted_field(opts)
1129 |> maybe_order(opts)
1130 |> restrict_recipients(recipients, opts[:user])
1131 |> restrict_replies(opts)
1132 |> restrict_tag(opts)
1133 |> restrict_tag_reject(opts)
1134 |> restrict_tag_all(opts)
1135 |> restrict_since(opts)
1136 |> restrict_local(opts)
1137 |> restrict_actor(opts)
1138 |> restrict_type(opts)
1139 |> restrict_state(opts)
1140 |> restrict_favorited_by(opts)
1141 |> restrict_blocked(restrict_blocked_opts)
1142 |> restrict_blockers_visibility(opts)
1143 |> restrict_muted(restrict_muted_opts)
1144 |> restrict_filtered(opts)
1145 |> restrict_media(opts)
1146 |> restrict_visibility(opts)
1147 |> restrict_thread_visibility(opts, config)
1148 |> restrict_reblogs(opts)
1149 |> restrict_pinned(opts)
1150 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1151 |> restrict_instance(opts)
1152 |> restrict_announce_object_actor(opts)
1153 |> restrict_filtered(opts)
1154 |> Activity.restrict_deactivated_users()
1155 |> exclude_poll_votes(opts)
1156 |> exclude_chat_messages(opts)
1157 |> exclude_invisible_actors(opts)
1158 |> exclude_visibility(opts)
1161 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1162 list_memberships = Pleroma.List.memberships(opts[:user])
1164 fetch_activities_query(recipients ++ list_memberships, opts)
1165 |> Pagination.fetch_paginated(opts, pagination)
1167 |> maybe_update_cc(list_memberships, opts[:user])
1171 Fetch favorites activities of user with order by sort adds to favorites
1173 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1174 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1176 |> Activity.Queries.by_actor()
1177 |> Activity.Queries.by_type("Like")
1178 |> Activity.with_joined_object()
1179 |> Object.with_joined_activity()
1180 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1181 |> order_by([like, _, _], desc_nulls_last: like.id)
1182 |> Pagination.fetch_paginated(
1183 Map.merge(params, %{skip_order: true}),
1188 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1189 Enum.map(activities, fn
1190 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1191 if Enum.any?(bcc, &(&1 in list_memberships)) do
1192 update_in(activity.data["cc"], &[user_ap_id | &1])
1202 defp maybe_update_cc(activities, _, _), do: activities
1204 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1205 from(activity in query,
1207 fragment("? && ?", activity.recipients, ^recipients) or
1208 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1209 ^Constants.as_public() in activity.recipients)
1213 def fetch_activities_bounded(
1215 recipients_with_public,
1217 pagination \\ :keyset
1219 fetch_activities_query([], opts)
1220 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1221 |> Pagination.fetch_paginated(opts, pagination)
1225 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1226 def upload(file, opts \\ []) do
1227 with {:ok, data} <- Upload.store(file, opts) do
1228 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1230 Repo.insert(%Object{data: obj_data})
1234 @spec get_actor_url(any()) :: binary() | nil
1235 defp get_actor_url(url) when is_binary(url), do: url
1236 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1238 defp get_actor_url(url) when is_list(url) do
1244 defp get_actor_url(_url), do: nil
1246 defp object_to_user_data(data) do
1248 data["icon"]["url"] &&
1251 "url" => [%{"href" => data["icon"]["url"]}]
1255 data["image"]["url"] &&
1258 "url" => [%{"href" => data["image"]["url"]}]
1263 |> Map.get("attachment", [])
1264 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1265 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1269 |> Map.get("tag", [])
1271 %{"type" => "Emoji"} -> true
1274 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1275 {String.trim(name, ":"), url}
1278 is_locked = data["manuallyApprovesFollowers"] || false
1279 capabilities = data["capabilities"] || %{}
1280 accepts_chat_messages = capabilities["acceptsChatMessages"]
1281 data = Transmogrifier.maybe_fix_user_object(data)
1282 is_discoverable = data["discoverable"] || false
1283 invisible = data["invisible"] || false
1284 actor_type = data["type"] || "Person"
1287 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1288 data["publicKey"]["publicKeyPem"]
1294 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1295 data["endpoints"]["sharedInbox"]
1302 uri: get_actor_url(data["url"]),
1307 is_locked: is_locked,
1308 is_discoverable: is_discoverable,
1309 invisible: invisible,
1312 follower_address: data["followers"],
1313 following_address: data["following"],
1314 bio: data["summary"] || "",
1315 actor_type: actor_type,
1316 also_known_as: Map.get(data, "alsoKnownAs", []),
1317 public_key: public_key,
1318 inbox: data["inbox"],
1319 shared_inbox: shared_inbox,
1320 accepts_chat_messages: accepts_chat_messages
1323 # nickname can be nil because of virtual actors
1324 if data["preferredUsername"] do
1328 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1331 Map.put(user_data, :nickname, nil)
1335 def fetch_follow_information_for_user(user) do
1336 with {:ok, following_data} <-
1337 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
1340 {:ok, hide_follows} <- collection_private(following_data),
1341 {:ok, followers_data} <-
1342 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
1343 {:ok, hide_followers} <- collection_private(followers_data) do
1346 hide_follows: hide_follows,
1347 follower_count: normalize_counter(followers_data["totalItems"]),
1348 following_count: normalize_counter(following_data["totalItems"]),
1349 hide_followers: hide_followers
1352 {:error, _} = e -> e
1357 defp normalize_counter(counter) when is_integer(counter), do: counter
1358 defp normalize_counter(_), do: 0
1360 def maybe_update_follow_information(user_data) do
1361 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1362 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1364 {:collections_available,
1365 !!(user_data[:following_address] && user_data[:follower_address])},
1367 fetch_follow_information_for_user(user_data) do
1368 info = Map.merge(user_data[:info] || %{}, info)
1371 |> Map.put(:info, info)
1373 {:user_type_check, false} ->
1376 {:collections_available, false} ->
1379 {:enabled, false} ->
1384 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1391 defp collection_private(%{"first" => %{"type" => type}})
1392 when type in ["CollectionPage", "OrderedCollectionPage"],
1395 defp collection_private(%{"first" => first}) do
1396 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1397 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1400 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1401 {:error, _} = e -> e
1406 defp collection_private(_data), do: {:ok, true}
1408 def user_data_from_user_object(data) do
1409 with {:ok, data} <- MRF.filter(data) do
1410 {:ok, object_to_user_data(data)}
1416 def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
1417 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
1418 {:ok, data} <- user_data_from_user_object(data) do
1419 {:ok, maybe_update_follow_information(data)}
1421 {:error, "Object has been deleted" = e} ->
1422 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1425 {:error, {:reject, reason} = e} ->
1426 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1430 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1435 def maybe_handle_clashing_nickname(data) do
1436 with nickname when is_binary(nickname) <- data[:nickname],
1437 %User{} = old_user <- User.get_by_nickname(nickname),
1438 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1440 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1446 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1447 |> User.update_and_set_cache()
1449 {:ap_id_comparison, true} ->
1451 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1459 def make_user_from_ap_id(ap_id, opts \\ []) do
1460 user = User.get_cached_by_ap_id(ap_id)
1462 if user && !User.ap_enabled?(user) do
1463 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1465 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
1468 |> User.remote_user_changeset(data)
1469 |> User.update_and_set_cache()
1471 maybe_handle_clashing_nickname(data)
1474 |> User.remote_user_changeset()
1482 def make_user_from_nickname(nickname) do
1483 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1484 make_user_from_ap_id(ap_id)
1486 _e -> {:error, "No AP id in WebFinger"}
1490 # filter out broken threads
1491 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1492 entire_thread_visible_for_user?(activity, user)
1495 # do post-processing on a specific activity
1496 def contain_activity(%Activity{} = activity, %User{} = user) do
1497 contain_broken_threads(activity, user)
1500 def fetch_direct_messages_query do
1502 |> restrict_type(%{type: "Create"})
1503 |> restrict_visibility(%{visibility: "direct"})
1504 |> order_by([activity], asc: activity.id)