1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
126 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
130 %Activity{} = activity ->
136 {:containment, _} = error ->
139 {:error, _} = error ->
142 {:fake, true, map, recipients} ->
143 activity = %Activity{
147 recipients: recipients,
151 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
154 {:remote_limit_pass, _} ->
155 {:error, :remote_limit}
162 defp insert_activity_with_expiration(data, local, recipients) do
166 actor: data["actor"],
167 recipients: recipients
170 with {:ok, activity} <- Repo.insert(struct) do
171 maybe_create_activity_expiration(activity)
175 def notify_and_stream(activity) do
176 Notification.create_notifications(activity)
178 conversation = create_or_bump_conversation(activity, activity.actor)
179 participations = get_participations(conversation)
181 stream_out_participations(participations)
184 defp maybe_create_activity_expiration(
185 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
188 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
189 activity_id: activity.id,
190 expires_at: expires_at
196 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
198 defp create_or_bump_conversation(activity, actor) do
199 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
200 %User{} = user <- User.get_cached_by_ap_id(actor) do
201 Participation.mark_as_read(user, conversation)
206 defp get_participations({:ok, conversation}) do
208 |> Repo.preload(:participations, force: true)
209 |> Map.get(:participations)
212 defp get_participations(_), do: []
214 def stream_out_participations(participations) do
217 |> Repo.preload(:user)
219 Streamer.stream("participation", participations)
222 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
223 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
224 conversation = Repo.preload(conversation, :participations)
227 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
232 if last_activity_id do
233 stream_out_participations(conversation.participations)
238 def stream_out_participations(_, _), do: :noop
240 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
241 when data_type in ["Create", "Announce", "Delete"] do
243 |> Topics.get_activity_topics()
244 |> Streamer.stream(activity)
247 def stream_out(_activity) do
251 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
252 def create(params, fake \\ false) do
253 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
258 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
259 additional = params[:additional] || %{}
260 # only accept false as false value
261 local = !(params[:local] == false)
262 published = params[:published]
263 quick_insert? = Config.get([:env]) == :benchmark
267 %{to: to, actor: actor, published: published, context: context, object: object},
271 with {:ok, activity} <- insert(create_data, local, fake),
272 {:fake, false, activity} <- {:fake, fake, activity},
273 _ <- increase_replies_count_if_reply(create_data),
274 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
275 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
276 _ <- notify_and_stream(activity),
277 :ok <- maybe_federate(activity) do
280 {:quick_insert, true, activity} ->
283 {:fake, true, activity} ->
287 Repo.rollback(message)
291 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
292 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
293 additional = params[:additional] || %{}
294 # only accept false as false value
295 local = !(params[:local] == false)
296 published = params[:published]
300 %{to: to, actor: actor, published: published, context: context, object: object},
304 with {:ok, activity} <- insert(listen_data, local),
305 _ <- notify_and_stream(activity),
306 :ok <- maybe_federate(activity) do
311 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
312 {:ok, Activity.t()} | nil | {:error, any()}
313 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
314 with {:ok, result} <-
315 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
320 defp do_unfollow(follower, followed, activity_id, local) do
321 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
322 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
323 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
324 {:ok, activity} <- insert(unfollow_data, local),
325 _ <- notify_and_stream(activity),
326 :ok <- maybe_federate(activity) do
330 {:error, error} -> Repo.rollback(error)
334 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
344 # only accept false as false value
345 local = !(params[:local] == false)
346 forward = !(params[:forward] == false)
348 additional = params[:additional] || %{}
352 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
354 Map.merge(additional, %{"to" => [], "cc" => []})
357 with flag_data <- make_flag_data(params, additional),
358 {:ok, activity} <- insert(flag_data, local),
359 {:ok, stripped_activity} <- strip_report_status_data(activity),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(stripped_activity) do
362 User.all_superusers()
363 |> Enum.filter(fn user -> not is_nil(user.email) end)
364 |> Enum.each(fn superuser ->
366 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
367 |> Pleroma.Emails.Mailer.deliver_async()
374 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
375 def move(%User{} = origin, %User{} = target, local \\ true) do
378 "actor" => origin.ap_id,
379 "object" => origin.ap_id,
380 "target" => target.ap_id
383 with true <- origin.ap_id in target.also_known_as,
384 {:ok, activity} <- insert(params, local),
385 _ <- notify_and_stream(activity) do
386 maybe_federate(activity)
388 BackgroundWorker.enqueue("move_following", %{
389 "origin_id" => origin.id,
390 "target_id" => target.id
395 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
400 def fetch_activities_for_context_query(context, opts) do
401 public = [Constants.as_public()]
405 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
408 from(activity in Activity)
409 |> maybe_preload_objects(opts)
410 |> maybe_preload_bookmarks(opts)
411 |> maybe_set_thread_muted_field(opts)
412 |> restrict_blocked(opts)
413 |> restrict_blockers_visibility(opts)
414 |> restrict_recipients(recipients, opts[:user])
415 |> restrict_filtered(opts)
419 "?->>'type' = ? and ?->>'context' = ?",
426 |> exclude_poll_votes(opts)
428 |> order_by([activity], desc: activity.id)
431 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
432 def fetch_activities_for_context(context, opts \\ %{}) do
434 |> fetch_activities_for_context_query(opts)
438 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
439 FlakeId.Ecto.CompatType.t() | nil
440 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
442 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
443 |> restrict_visibility(%{visibility: "direct"})
449 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
450 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
451 opts = Map.delete(opts, :user)
453 [Constants.as_public()]
454 |> fetch_activities_query(opts)
455 |> restrict_unlisted(opts)
456 |> Pagination.fetch_paginated(opts, pagination)
459 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
460 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
462 |> Map.put(:restrict_unlisted, true)
463 |> fetch_public_or_unlisted_activities(pagination)
466 @valid_visibilities ~w[direct unlisted public private]
468 defp restrict_visibility(query, %{visibility: visibility})
469 when is_list(visibility) do
470 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
475 "activity_visibility(?, ?, ?) = ANY (?)",
483 Logger.error("Could not restrict visibility to #{visibility}")
487 defp restrict_visibility(query, %{visibility: visibility})
488 when visibility in @valid_visibilities do
492 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
496 defp restrict_visibility(_query, %{visibility: visibility})
497 when visibility not in @valid_visibilities do
498 Logger.error("Could not restrict visibility to #{visibility}")
501 defp restrict_visibility(query, _visibility), do: query
503 defp exclude_visibility(query, %{exclude_visibilities: visibility})
504 when is_list(visibility) do
505 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
510 "activity_visibility(?, ?, ?) = ANY (?)",
518 Logger.error("Could not exclude visibility to #{visibility}")
523 defp exclude_visibility(query, %{exclude_visibilities: visibility})
524 when visibility in @valid_visibilities do
529 "activity_visibility(?, ?, ?) = ?",
538 defp exclude_visibility(query, %{exclude_visibilities: visibility})
539 when visibility not in [nil | @valid_visibilities] do
540 Logger.error("Could not exclude visibility to #{visibility}")
544 defp exclude_visibility(query, _visibility), do: query
546 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
549 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
552 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
555 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
559 defp restrict_thread_visibility(query, _, _), do: query
561 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
564 |> Map.put(:user, reading_user)
565 |> Map.put(:actor_id, user.ap_id)
568 godmode: params[:godmode],
569 reading_user: reading_user
571 |> user_activities_recipients()
572 |> fetch_activities(params)
576 def fetch_user_activities(user, reading_user, params \\ %{}) do
579 |> Map.put(:type, ["Create", "Announce"])
580 |> Map.put(:user, reading_user)
581 |> Map.put(:actor_id, user.ap_id)
582 |> Map.put(:pinned_activity_ids, user.pinned_activities)
585 if User.blocks?(reading_user, user) do
589 |> Map.put(:blocking_user, reading_user)
590 |> Map.put(:muting_user, reading_user)
594 godmode: params[:godmode],
595 reading_user: reading_user
597 |> user_activities_recipients()
598 |> fetch_activities(params)
602 def fetch_statuses(reading_user, params) do
603 params = Map.put(params, :type, ["Create", "Announce"])
606 godmode: params[:godmode],
607 reading_user: reading_user
609 |> user_activities_recipients()
610 |> fetch_activities(params, :offset)
614 defp user_activities_recipients(%{godmode: true}), do: []
616 defp user_activities_recipients(%{reading_user: reading_user}) do
618 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
620 [Constants.as_public()]
624 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
625 raise "Can't use the child object without preloading!"
628 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
630 [activity, object] in query,
633 "?->>'type' != ? or ?->>'actor' != ?",
642 defp restrict_announce_object_actor(query, _), do: query
644 defp restrict_since(query, %{since_id: ""}), do: query
646 defp restrict_since(query, %{since_id: since_id}) do
647 from(activity in query, where: activity.id > ^since_id)
650 defp restrict_since(query, _), do: query
652 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
653 raise "Can't use the child object without preloading!"
656 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
658 [_activity, object] in query,
659 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
663 defp restrict_tag_reject(query, _), do: query
665 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
666 raise "Can't use the child object without preloading!"
669 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
671 [_activity, object] in query,
672 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
676 defp restrict_tag_all(query, _), do: query
678 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
679 raise "Can't use the child object without preloading!"
682 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
689 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
691 [_activity, object] in query,
692 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
696 defp restrict_tag(query, _), do: query
698 defp restrict_recipients(query, [], _user), do: query
700 defp restrict_recipients(query, recipients, nil) do
701 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
704 defp restrict_recipients(query, recipients, user) do
707 where: fragment("? && ?", ^recipients, activity.recipients),
708 or_where: activity.actor == ^user.ap_id
712 defp restrict_local(query, %{local_only: true}) do
713 from(activity in query, where: activity.local == true)
716 defp restrict_local(query, _), do: query
718 defp restrict_actor(query, %{actor_id: actor_id}) do
719 from(activity in query, where: activity.actor == ^actor_id)
722 defp restrict_actor(query, _), do: query
724 defp restrict_type(query, %{type: type}) when is_binary(type) do
725 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
728 defp restrict_type(query, %{type: type}) do
729 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
732 defp restrict_type(query, _), do: query
734 defp restrict_state(query, %{state: state}) do
735 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
738 defp restrict_state(query, _), do: query
740 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
742 [_activity, object] in query,
743 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
747 defp restrict_favorited_by(query, _), do: query
749 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
750 raise "Can't use the child object without preloading!"
753 defp restrict_media(query, %{only_media: true}) do
755 [activity, object] in query,
756 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
757 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
761 defp restrict_media(query, _), do: query
763 defp restrict_replies(query, %{exclude_replies: true}) do
765 [_activity, object] in query,
766 where: fragment("?->>'inReplyTo' is null", object.data)
770 defp restrict_replies(query, %{
771 reply_filtering_user: %User{} = user,
772 reply_visibility: "self"
775 [activity, object] in query,
778 "?->>'inReplyTo' is null OR ? = ANY(?)",
786 defp restrict_replies(query, %{
787 reply_filtering_user: %User{} = user,
788 reply_visibility: "following"
791 [activity, object] in query,
795 ?->>'type' != 'Create' -- This isn't a Create
796 OR ?->>'inReplyTo' is null -- this isn't a reply
797 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
798 -- unless they are the author (because authors
799 -- are also part of the recipients). This leads
800 -- to a bug that self-replies by friends won't
802 OR ? = ? -- The actor is us
806 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
815 defp restrict_replies(query, _), do: query
817 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
818 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
821 defp restrict_reblogs(query, _), do: query
823 defp restrict_muted(query, %{with_muted: true}), do: query
825 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
826 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
829 from([activity] in query,
830 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
831 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
834 unless opts[:skip_preload] do
835 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
841 defp restrict_muted(query, _), do: query
843 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
844 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
845 domain_blocks = user.domain_blocks || []
847 following_ap_ids = User.get_friends_ap_ids(user)
850 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
853 [activity, object: o] in query,
854 # You don't block the author
855 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
857 # You don't block any recipients, and didn't author the post
860 "((not (? && ?)) or ? = ?)",
867 # You don't block the domain of any recipients, and didn't author the post
870 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
877 # It's not a boost of a user you block
880 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
886 # You don't block the author's domain, and also don't follow the author
889 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
896 # Same as above, but checks the Object
899 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
908 defp restrict_blocked(query, _), do: query
910 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
911 if Config.get([:activitypub, :blockers_visible]) == true do
914 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
918 # The author doesn't block you
919 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
921 # It's not a boost of a user that blocks you
924 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
933 defp restrict_blockers_visibility(query, _), do: query
935 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
940 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
942 ^[Constants.as_public()]
947 defp restrict_unlisted(query, _), do: query
949 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
950 from(activity in query, where: activity.id in ^ids)
953 defp restrict_pinned(query, _), do: query
955 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
956 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
962 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
970 defp restrict_muted_reblogs(query, _), do: query
972 defp restrict_instance(query, %{instance: instance}) do
977 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
981 from(activity in query, where: activity.actor in ^users)
984 defp restrict_instance(query, _), do: query
986 defp restrict_filtered(query, %{user: %User{} = user}) do
987 case Filter.compose_regex(user) do
992 from([activity, object] in query,
994 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
995 activity.actor == ^user.ap_id
1000 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1001 restrict_filtered(query, %{user: user})
1004 defp restrict_filtered(query, _), do: query
1006 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1008 defp exclude_poll_votes(query, _) do
1009 if has_named_binding?(query, :object) do
1010 from([activity, object: o] in query,
1011 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1018 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1020 defp exclude_chat_messages(query, _) do
1021 if has_named_binding?(query, :object) do
1022 from([activity, object: o] in query,
1023 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1030 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1032 defp exclude_invisible_actors(query, _opts) do
1034 User.Query.build(%{invisible: true, select: [:ap_id]})
1036 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1038 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1041 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1042 from(activity in query, where: activity.id != ^id)
1045 defp exclude_id(query, _), do: query
1047 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1049 defp maybe_preload_objects(query, _) do
1051 |> Activity.with_preloaded_object()
1054 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1056 defp maybe_preload_bookmarks(query, opts) do
1058 |> Activity.with_preloaded_bookmark(opts[:user])
1061 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1063 |> Activity.with_preloaded_report_notes()
1066 defp maybe_preload_report_notes(query, _), do: query
1068 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1070 defp maybe_set_thread_muted_field(query, opts) do
1072 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1075 defp maybe_order(query, %{order: :desc}) do
1077 |> order_by(desc: :id)
1080 defp maybe_order(query, %{order: :asc}) do
1082 |> order_by(asc: :id)
1085 defp maybe_order(query, _), do: query
1087 defp fetch_activities_query_ap_ids_ops(opts) do
1088 source_user = opts[:muting_user]
1089 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1091 ap_id_relationships =
1092 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1093 [:block | ap_id_relationships]
1098 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1100 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1101 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1103 restrict_muted_reblogs_opts =
1104 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1106 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1109 def fetch_activities_query(recipients, opts \\ %{}) do
1110 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1111 fetch_activities_query_ap_ids_ops(opts)
1114 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1118 |> maybe_preload_objects(opts)
1119 |> maybe_preload_bookmarks(opts)
1120 |> maybe_preload_report_notes(opts)
1121 |> maybe_set_thread_muted_field(opts)
1122 |> maybe_order(opts)
1123 |> restrict_recipients(recipients, opts[:user])
1124 |> restrict_replies(opts)
1125 |> restrict_tag(opts)
1126 |> restrict_tag_reject(opts)
1127 |> restrict_tag_all(opts)
1128 |> restrict_since(opts)
1129 |> restrict_local(opts)
1130 |> restrict_actor(opts)
1131 |> restrict_type(opts)
1132 |> restrict_state(opts)
1133 |> restrict_favorited_by(opts)
1134 |> restrict_blocked(restrict_blocked_opts)
1135 |> restrict_blockers_visibility(opts)
1136 |> restrict_muted(restrict_muted_opts)
1137 |> restrict_filtered(opts)
1138 |> restrict_media(opts)
1139 |> restrict_visibility(opts)
1140 |> restrict_thread_visibility(opts, config)
1141 |> restrict_reblogs(opts)
1142 |> restrict_pinned(opts)
1143 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1144 |> restrict_instance(opts)
1145 |> restrict_announce_object_actor(opts)
1146 |> restrict_filtered(opts)
1147 |> Activity.restrict_deactivated_users()
1148 |> exclude_poll_votes(opts)
1149 |> exclude_chat_messages(opts)
1150 |> exclude_invisible_actors(opts)
1151 |> exclude_visibility(opts)
1154 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1155 list_memberships = Pleroma.List.memberships(opts[:user])
1157 fetch_activities_query(recipients ++ list_memberships, opts)
1158 |> Pagination.fetch_paginated(opts, pagination)
1160 |> maybe_update_cc(list_memberships, opts[:user])
1164 Fetch favorites activities of user with order by sort adds to favorites
1166 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1167 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1169 |> Activity.Queries.by_actor()
1170 |> Activity.Queries.by_type("Like")
1171 |> Activity.with_joined_object()
1172 |> Object.with_joined_activity()
1173 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1174 |> order_by([like, _, _], desc_nulls_last: like.id)
1175 |> Pagination.fetch_paginated(
1176 Map.merge(params, %{skip_order: true}),
1181 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1182 Enum.map(activities, fn
1183 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1184 if Enum.any?(bcc, &(&1 in list_memberships)) do
1185 update_in(activity.data["cc"], &[user_ap_id | &1])
1195 defp maybe_update_cc(activities, _, _), do: activities
1197 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1198 from(activity in query,
1200 fragment("? && ?", activity.recipients, ^recipients) or
1201 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1202 ^Constants.as_public() in activity.recipients)
1206 def fetch_activities_bounded(
1208 recipients_with_public,
1210 pagination \\ :keyset
1212 fetch_activities_query([], opts)
1213 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1214 |> Pagination.fetch_paginated(opts, pagination)
1218 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1219 def upload(file, opts \\ []) do
1220 with {:ok, data} <- Upload.store(file, opts) do
1221 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1223 Repo.insert(%Object{data: obj_data})
1227 @spec get_actor_url(any()) :: binary() | nil
1228 defp get_actor_url(url) when is_binary(url), do: url
1229 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1231 defp get_actor_url(url) when is_list(url) do
1237 defp get_actor_url(_url), do: nil
1239 defp object_to_user_data(data) do
1241 data["icon"]["url"] &&
1244 "url" => [%{"href" => data["icon"]["url"]}]
1248 data["image"]["url"] &&
1251 "url" => [%{"href" => data["image"]["url"]}]
1256 |> Map.get("attachment", [])
1257 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1258 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1262 |> Map.get("tag", [])
1264 %{"type" => "Emoji"} -> true
1267 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1268 {String.trim(name, ":"), url}
1271 locked = data["manuallyApprovesFollowers"] || false
1272 capabilities = data["capabilities"] || %{}
1273 accepts_chat_messages = capabilities["acceptsChatMessages"]
1274 data = Transmogrifier.maybe_fix_user_object(data)
1275 discoverable = data["discoverable"] || false
1276 invisible = data["invisible"] || false
1277 actor_type = data["type"] || "Person"
1280 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1281 data["publicKey"]["publicKeyPem"]
1287 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1288 data["endpoints"]["sharedInbox"]
1295 uri: get_actor_url(data["url"]),
1301 discoverable: discoverable,
1302 invisible: invisible,
1305 follower_address: data["followers"],
1306 following_address: data["following"],
1307 bio: data["summary"] || "",
1308 actor_type: actor_type,
1309 also_known_as: Map.get(data, "alsoKnownAs", []),
1310 public_key: public_key,
1311 inbox: data["inbox"],
1312 shared_inbox: shared_inbox,
1313 accepts_chat_messages: accepts_chat_messages
1316 # nickname can be nil because of virtual actors
1317 if data["preferredUsername"] do
1321 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1324 Map.put(user_data, :nickname, nil)
1328 def fetch_follow_information_for_user(user) do
1329 with {:ok, following_data} <-
1330 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
1333 {:ok, hide_follows} <- collection_private(following_data),
1334 {:ok, followers_data} <-
1335 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
1336 {:ok, hide_followers} <- collection_private(followers_data) do
1339 hide_follows: hide_follows,
1340 follower_count: normalize_counter(followers_data["totalItems"]),
1341 following_count: normalize_counter(following_data["totalItems"]),
1342 hide_followers: hide_followers
1345 {:error, _} = e -> e
1350 defp normalize_counter(counter) when is_integer(counter), do: counter
1351 defp normalize_counter(_), do: 0
1353 def maybe_update_follow_information(user_data) do
1354 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1355 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1357 {:collections_available,
1358 !!(user_data[:following_address] && user_data[:follower_address])},
1360 fetch_follow_information_for_user(user_data) do
1361 info = Map.merge(user_data[:info] || %{}, info)
1364 |> Map.put(:info, info)
1366 {:user_type_check, false} ->
1369 {:collections_available, false} ->
1372 {:enabled, false} ->
1377 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1384 defp collection_private(%{"first" => %{"type" => type}})
1385 when type in ["CollectionPage", "OrderedCollectionPage"],
1388 defp collection_private(%{"first" => first}) do
1389 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1390 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1393 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1394 {:error, _} = e -> e
1399 defp collection_private(_data), do: {:ok, true}
1401 def user_data_from_user_object(data) do
1402 with {:ok, data} <- MRF.filter(data) do
1403 {:ok, object_to_user_data(data)}
1409 def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
1410 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
1411 {:ok, data} <- user_data_from_user_object(data) do
1412 {:ok, maybe_update_follow_information(data)}
1414 {:error, "Object has been deleted" = e} ->
1415 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1418 {:error, {:reject, reason} = e} ->
1419 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1423 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1428 def maybe_handle_clashing_nickname(data) do
1429 with nickname when is_binary(nickname) <- data[:nickname],
1430 %User{} = old_user <- User.get_by_nickname(nickname),
1431 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1433 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1439 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1440 |> User.update_and_set_cache()
1442 {:ap_id_comparison, true} ->
1444 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1452 def make_user_from_ap_id(ap_id, opts \\ []) do
1453 user = User.get_cached_by_ap_id(ap_id)
1455 if user && !User.ap_enabled?(user) do
1456 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1458 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
1461 |> User.remote_user_changeset(data)
1462 |> User.update_and_set_cache()
1464 maybe_handle_clashing_nickname(data)
1467 |> User.remote_user_changeset()
1475 def make_user_from_nickname(nickname) do
1476 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1477 make_user_from_ap_id(ap_id)
1479 _e -> {:error, "No AP id in WebFinger"}
1483 # filter out broken threads
1484 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1485 entire_thread_visible_for_user?(activity, user)
1488 # do post-processing on a specific activity
1489 def contain_activity(%Activity{} = activity, %User{} = user) do
1490 contain_broken_threads(activity, user)
1493 def fetch_direct_messages_query do
1495 |> restrict_type(%{type: "Create"})
1496 |> restrict_visibility(%{visibility: "direct"})
1497 |> order_by([activity], asc: activity.id)