1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
126 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
127 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 %Activity{} = activity ->
138 {:containment, _} = error ->
141 {:error, _} = error ->
144 {:fake, true, map, recipients} ->
145 activity = %Activity{
149 recipients: recipients,
153 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
156 {:remote_limit_pass, _} ->
157 {:error, :remote_limit}
164 defp insert_activity_with_expiration(data, local, recipients) do
168 actor: data["actor"],
169 recipients: recipients
172 with {:ok, activity} <- Repo.insert(struct) do
173 maybe_create_activity_expiration(activity)
177 def notify_and_stream(activity) do
178 Notification.create_notifications(activity)
180 conversation = create_or_bump_conversation(activity, activity.actor)
181 participations = get_participations(conversation)
183 stream_out_participations(participations)
186 defp maybe_create_activity_expiration(
187 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
190 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
191 activity_id: activity.id,
192 expires_at: expires_at
198 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
200 defp create_or_bump_conversation(activity, actor) do
201 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
202 %User{} = user <- User.get_cached_by_ap_id(actor) do
203 Participation.mark_as_read(user, conversation)
208 defp get_participations({:ok, conversation}) do
210 |> Repo.preload(:participations, force: true)
211 |> Map.get(:participations)
214 defp get_participations(_), do: []
216 def stream_out_participations(participations) do
219 |> Repo.preload(:user)
221 Streamer.stream("participation", participations)
224 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
225 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
226 conversation = Repo.preload(conversation, :participations)
229 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
234 if last_activity_id do
235 stream_out_participations(conversation.participations)
240 def stream_out_participations(_, _), do: :noop
242 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
243 when data_type in ["Create", "Announce", "Delete"] do
245 |> Topics.get_activity_topics()
246 |> Streamer.stream(activity)
249 def stream_out(_activity) do
253 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
254 def create(params, fake \\ false) do
255 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
260 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
261 additional = params[:additional] || %{}
262 # only accept false as false value
263 local = !(params[:local] == false)
264 published = params[:published]
265 quick_insert? = Config.get([:env]) == :benchmark
269 %{to: to, actor: actor, published: published, context: context, object: object},
273 with {:ok, activity} <- insert(create_data, local, fake),
274 {:fake, false, activity} <- {:fake, fake, activity},
275 _ <- increase_replies_count_if_reply(create_data),
276 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
277 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
278 _ <- notify_and_stream(activity),
279 :ok <- maybe_federate(activity) do
282 {:quick_insert, true, activity} ->
285 {:fake, true, activity} ->
289 Repo.rollback(message)
293 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
294 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
295 additional = params[:additional] || %{}
296 # only accept false as false value
297 local = !(params[:local] == false)
298 published = params[:published]
302 %{to: to, actor: actor, published: published, context: context, object: object},
306 with {:ok, activity} <- insert(listen_data, local),
307 _ <- notify_and_stream(activity),
308 :ok <- maybe_federate(activity) do
313 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
314 {:ok, Activity.t()} | nil | {:error, any()}
315 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
316 with {:ok, result} <-
317 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
322 defp do_unfollow(follower, followed, activity_id, local) do
323 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
324 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
325 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
326 {:ok, activity} <- insert(unfollow_data, local),
327 _ <- notify_and_stream(activity),
328 :ok <- maybe_federate(activity) do
332 {:error, error} -> Repo.rollback(error)
336 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
338 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
352 # only accept false as false value
353 local = !(params[:local] == false)
354 forward = !(params[:forward] == false)
356 additional = params[:additional] || %{}
360 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
362 Map.merge(additional, %{"to" => [], "cc" => []})
365 with flag_data <- make_flag_data(params, additional),
366 {:ok, activity} <- insert(flag_data, local),
367 {:ok, stripped_activity} <- strip_report_status_data(activity),
368 _ <- notify_and_stream(activity),
370 maybe_federate(stripped_activity) do
371 User.all_superusers()
372 |> Enum.filter(fn user -> not is_nil(user.email) end)
373 |> Enum.each(fn superuser ->
375 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
376 |> Pleroma.Emails.Mailer.deliver_async()
381 {:error, error} -> Repo.rollback(error)
385 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
386 def move(%User{} = origin, %User{} = target, local \\ true) do
389 "actor" => origin.ap_id,
390 "object" => origin.ap_id,
391 "target" => target.ap_id
394 with true <- origin.ap_id in target.also_known_as,
395 {:ok, activity} <- insert(params, local),
396 _ <- notify_and_stream(activity) do
397 maybe_federate(activity)
399 BackgroundWorker.enqueue("move_following", %{
400 "origin_id" => origin.id,
401 "target_id" => target.id
406 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
411 def fetch_activities_for_context_query(context, opts) do
412 public = [Constants.as_public()]
416 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
419 from(activity in Activity)
420 |> maybe_preload_objects(opts)
421 |> maybe_preload_bookmarks(opts)
422 |> maybe_set_thread_muted_field(opts)
423 |> restrict_blocked(opts)
424 |> restrict_recipients(recipients, opts[:user])
425 |> restrict_filtered(opts)
429 "?->>'type' = ? and ?->>'context' = ?",
436 |> exclude_poll_votes(opts)
438 |> order_by([activity], desc: activity.id)
441 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
442 def fetch_activities_for_context(context, opts \\ %{}) do
444 |> fetch_activities_for_context_query(opts)
448 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
449 FlakeId.Ecto.CompatType.t() | nil
450 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
452 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
453 |> restrict_visibility(%{visibility: "direct"})
459 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
460 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
461 opts = Map.delete(opts, :user)
463 [Constants.as_public()]
464 |> fetch_activities_query(opts)
465 |> restrict_unlisted(opts)
466 |> Pagination.fetch_paginated(opts, pagination)
469 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
470 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
472 |> Map.put(:restrict_unlisted, true)
473 |> fetch_public_or_unlisted_activities(pagination)
476 @valid_visibilities ~w[direct unlisted public private]
478 defp restrict_visibility(query, %{visibility: visibility})
479 when is_list(visibility) do
480 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
485 "activity_visibility(?, ?, ?) = ANY (?)",
493 Logger.error("Could not restrict visibility to #{visibility}")
497 defp restrict_visibility(query, %{visibility: visibility})
498 when visibility in @valid_visibilities do
502 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 defp restrict_visibility(_query, %{visibility: visibility})
507 when visibility not in @valid_visibilities do
508 Logger.error("Could not restrict visibility to #{visibility}")
511 defp restrict_visibility(query, _visibility), do: query
513 defp exclude_visibility(query, %{exclude_visibilities: visibility})
514 when is_list(visibility) do
515 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
520 "activity_visibility(?, ?, ?) = ANY (?)",
528 Logger.error("Could not exclude visibility to #{visibility}")
533 defp exclude_visibility(query, %{exclude_visibilities: visibility})
534 when visibility in @valid_visibilities do
539 "activity_visibility(?, ?, ?) = ?",
548 defp exclude_visibility(query, %{exclude_visibilities: visibility})
549 when visibility not in [nil | @valid_visibilities] do
550 Logger.error("Could not exclude visibility to #{visibility}")
554 defp exclude_visibility(query, _visibility), do: query
556 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
559 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
562 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
565 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 defp restrict_thread_visibility(query, _, _), do: query
571 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
574 |> Map.put(:user, reading_user)
575 |> Map.put(:actor_id, user.ap_id)
578 godmode: params[:godmode],
579 reading_user: reading_user
581 |> user_activities_recipients()
582 |> fetch_activities(params)
586 def fetch_user_activities(user, reading_user, params \\ %{}) do
589 |> Map.put(:type, ["Create", "Announce"])
590 |> Map.put(:user, reading_user)
591 |> Map.put(:actor_id, user.ap_id)
592 |> Map.put(:pinned_activity_ids, user.pinned_activities)
595 if User.blocks?(reading_user, user) do
599 |> Map.put(:blocking_user, reading_user)
600 |> Map.put(:muting_user, reading_user)
604 godmode: params[:godmode],
605 reading_user: reading_user
607 |> user_activities_recipients()
608 |> fetch_activities(params)
612 def fetch_statuses(reading_user, params) do
613 params = Map.put(params, :type, ["Create", "Announce"])
616 godmode: params[:godmode],
617 reading_user: reading_user
619 |> user_activities_recipients()
620 |> fetch_activities(params, :offset)
624 defp user_activities_recipients(%{godmode: true}), do: []
626 defp user_activities_recipients(%{reading_user: reading_user}) do
628 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
630 [Constants.as_public()]
634 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
635 raise "Can't use the child object without preloading!"
638 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
640 [activity, object] in query,
643 "?->>'type' != ? or ?->>'actor' != ?",
652 defp restrict_announce_object_actor(query, _), do: query
654 defp restrict_since(query, %{since_id: ""}), do: query
656 defp restrict_since(query, %{since_id: since_id}) do
657 from(activity in query, where: activity.id > ^since_id)
660 defp restrict_since(query, _), do: query
662 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
663 raise "Can't use the child object without preloading!"
666 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
668 [_activity, object] in query,
669 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 defp restrict_tag_reject(query, _), do: query
675 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
676 raise "Can't use the child object without preloading!"
679 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
681 [_activity, object] in query,
682 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
686 defp restrict_tag_all(query, _), do: query
688 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
689 raise "Can't use the child object without preloading!"
692 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
694 [_activity, object] in query,
695 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
699 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
701 [_activity, object] in query,
702 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
706 defp restrict_tag(query, _), do: query
708 defp restrict_recipients(query, [], _user), do: query
710 defp restrict_recipients(query, recipients, nil) do
711 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
714 defp restrict_recipients(query, recipients, user) do
717 where: fragment("? && ?", ^recipients, activity.recipients),
718 or_where: activity.actor == ^user.ap_id
722 defp restrict_local(query, %{local_only: true}) do
723 from(activity in query, where: activity.local == true)
726 defp restrict_local(query, _), do: query
728 defp restrict_actor(query, %{actor_id: actor_id}) do
729 from(activity in query, where: activity.actor == ^actor_id)
732 defp restrict_actor(query, _), do: query
734 defp restrict_type(query, %{type: type}) when is_binary(type) do
735 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
738 defp restrict_type(query, %{type: type}) do
739 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
742 defp restrict_type(query, _), do: query
744 defp restrict_state(query, %{state: state}) do
745 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
748 defp restrict_state(query, _), do: query
750 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
752 [_activity, object] in query,
753 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
757 defp restrict_favorited_by(query, _), do: query
759 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
760 raise "Can't use the child object without preloading!"
763 defp restrict_media(query, %{only_media: true}) do
765 [activity, object] in query,
766 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
767 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
771 defp restrict_media(query, _), do: query
773 defp restrict_replies(query, %{exclude_replies: true}) do
775 [_activity, object] in query,
776 where: fragment("?->>'inReplyTo' is null", object.data)
780 defp restrict_replies(query, %{
781 reply_filtering_user: %User{} = user,
782 reply_visibility: "self"
785 [activity, object] in query,
788 "?->>'inReplyTo' is null OR ? = ANY(?)",
796 defp restrict_replies(query, %{
797 reply_filtering_user: %User{} = user,
798 reply_visibility: "following"
801 [activity, object] in query,
805 ?->>'type' != 'Create' -- This isn't a Create
806 OR ?->>'inReplyTo' is null -- this isn't a reply
807 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
808 -- unless they are the author (because authors
809 -- are also part of the recipients). This leads
810 -- to a bug that self-replies by friends won't
812 OR ? = ? -- The actor is us
816 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
825 defp restrict_replies(query, _), do: query
827 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
828 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
831 defp restrict_reblogs(query, _), do: query
833 defp restrict_muted(query, %{with_muted: true}), do: query
835 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
836 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
839 from([activity] in query,
840 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
843 "not (?->'to' \\?| ?) or ? = ?",
851 unless opts[:skip_preload] do
852 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
858 defp restrict_muted(query, _), do: query
860 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
861 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
862 domain_blocks = user.domain_blocks || []
864 following_ap_ids = User.get_friends_ap_ids(user)
867 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
870 [activity, object: o] in query,
871 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
874 "((not (? && ?)) or ? = ?)",
882 "recipients_contain_blocked_domains(?, ?) = false",
888 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
895 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
903 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
912 defp restrict_blocked(query, _), do: query
914 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
919 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
921 ^[Constants.as_public()]
926 defp restrict_unlisted(query, _), do: query
928 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
929 from(activity in query, where: activity.id in ^ids)
932 defp restrict_pinned(query, _), do: query
934 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
935 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
941 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
949 defp restrict_muted_reblogs(query, _), do: query
951 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
954 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
958 defp restrict_instance(query, _), do: query
960 defp restrict_filtered(query, %{user: %User{} = user}) do
961 case Filter.compose_regex(user) do
966 from([activity, object] in query,
968 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
969 activity.actor == ^user.ap_id
974 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
975 restrict_filtered(query, %{user: user})
978 defp restrict_filtered(query, _), do: query
980 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
982 defp exclude_poll_votes(query, _) do
983 if has_named_binding?(query, :object) do
984 from([activity, object: o] in query,
985 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
992 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
994 defp exclude_chat_messages(query, _) do
995 if has_named_binding?(query, :object) do
996 from([activity, object: o] in query,
997 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1004 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1006 defp exclude_invisible_actors(query, _opts) do
1008 User.Query.build(%{invisible: true, select: [:ap_id]})
1010 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1012 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1015 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1016 from(activity in query, where: activity.id != ^id)
1019 defp exclude_id(query, _), do: query
1021 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1023 defp maybe_preload_objects(query, _) do
1025 |> Activity.with_preloaded_object()
1028 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1030 defp maybe_preload_bookmarks(query, opts) do
1032 |> Activity.with_preloaded_bookmark(opts[:user])
1035 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1037 |> Activity.with_preloaded_report_notes()
1040 defp maybe_preload_report_notes(query, _), do: query
1042 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1044 defp maybe_set_thread_muted_field(query, opts) do
1046 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1049 defp maybe_order(query, %{order: :desc}) do
1051 |> order_by(desc: :id)
1054 defp maybe_order(query, %{order: :asc}) do
1056 |> order_by(asc: :id)
1059 defp maybe_order(query, _), do: query
1061 defp fetch_activities_query_ap_ids_ops(opts) do
1062 source_user = opts[:muting_user]
1063 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1065 ap_id_relationships =
1066 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1067 [:block | ap_id_relationships]
1072 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1074 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1075 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1077 restrict_muted_reblogs_opts =
1078 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1080 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1083 def fetch_activities_query(recipients, opts \\ %{}) do
1084 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1085 fetch_activities_query_ap_ids_ops(opts)
1088 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1092 |> maybe_preload_objects(opts)
1093 |> maybe_preload_bookmarks(opts)
1094 |> maybe_preload_report_notes(opts)
1095 |> maybe_set_thread_muted_field(opts)
1096 |> maybe_order(opts)
1097 |> restrict_recipients(recipients, opts[:user])
1098 |> restrict_replies(opts)
1099 |> restrict_tag(opts)
1100 |> restrict_tag_reject(opts)
1101 |> restrict_tag_all(opts)
1102 |> restrict_since(opts)
1103 |> restrict_local(opts)
1104 |> restrict_actor(opts)
1105 |> restrict_type(opts)
1106 |> restrict_state(opts)
1107 |> restrict_favorited_by(opts)
1108 |> restrict_blocked(restrict_blocked_opts)
1109 |> restrict_muted(restrict_muted_opts)
1110 |> restrict_filtered(opts)
1111 |> restrict_media(opts)
1112 |> restrict_visibility(opts)
1113 |> restrict_thread_visibility(opts, config)
1114 |> restrict_reblogs(opts)
1115 |> restrict_pinned(opts)
1116 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1117 |> restrict_instance(opts)
1118 |> restrict_announce_object_actor(opts)
1119 |> restrict_filtered(opts)
1120 |> Activity.restrict_deactivated_users()
1121 |> exclude_poll_votes(opts)
1122 |> exclude_chat_messages(opts)
1123 |> exclude_invisible_actors(opts)
1124 |> exclude_visibility(opts)
1127 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1128 list_memberships = Pleroma.List.memberships(opts[:user])
1130 fetch_activities_query(recipients ++ list_memberships, opts)
1131 |> Pagination.fetch_paginated(opts, pagination)
1133 |> maybe_update_cc(list_memberships, opts[:user])
1137 Fetch favorites activities of user with order by sort adds to favorites
1139 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1140 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1142 |> Activity.Queries.by_actor()
1143 |> Activity.Queries.by_type("Like")
1144 |> Activity.with_joined_object()
1145 |> Object.with_joined_activity()
1146 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1147 |> order_by([like, _, _], desc_nulls_last: like.id)
1148 |> Pagination.fetch_paginated(
1149 Map.merge(params, %{skip_order: true}),
1154 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1155 Enum.map(activities, fn
1156 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1157 if Enum.any?(bcc, &(&1 in list_memberships)) do
1158 update_in(activity.data["cc"], &[user_ap_id | &1])
1168 defp maybe_update_cc(activities, _, _), do: activities
1170 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1171 from(activity in query,
1173 fragment("? && ?", activity.recipients, ^recipients) or
1174 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1175 ^Constants.as_public() in activity.recipients)
1179 def fetch_activities_bounded(
1181 recipients_with_public,
1183 pagination \\ :keyset
1185 fetch_activities_query([], opts)
1186 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1187 |> Pagination.fetch_paginated(opts, pagination)
1191 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1192 def upload(file, opts \\ []) do
1193 with {:ok, data} <- Upload.store(file, opts) do
1194 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1196 Repo.insert(%Object{data: obj_data})
1200 @spec get_actor_url(any()) :: binary() | nil
1201 defp get_actor_url(url) when is_binary(url), do: url
1202 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1204 defp get_actor_url(url) when is_list(url) do
1210 defp get_actor_url(_url), do: nil
1212 defp object_to_user_data(data) do
1214 data["icon"]["url"] &&
1217 "url" => [%{"href" => data["icon"]["url"]}]
1221 data["image"]["url"] &&
1224 "url" => [%{"href" => data["image"]["url"]}]
1229 |> Map.get("attachment", [])
1230 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1231 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1235 |> Map.get("tag", [])
1237 %{"type" => "Emoji"} -> true
1240 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1241 {String.trim(name, ":"), url}
1244 is_locked = data["manuallyApprovesFollowers"] || false
1245 capabilities = data["capabilities"] || %{}
1246 accepts_chat_messages = capabilities["acceptsChatMessages"]
1247 data = Transmogrifier.maybe_fix_user_object(data)
1248 is_discoverable = data["discoverable"] || false
1249 invisible = data["invisible"] || false
1250 actor_type = data["type"] || "Person"
1253 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1254 data["publicKey"]["publicKeyPem"]
1260 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1261 data["endpoints"]["sharedInbox"]
1268 uri: get_actor_url(data["url"]),
1273 is_locked: is_locked,
1274 is_discoverable: is_discoverable,
1275 invisible: invisible,
1278 follower_address: data["followers"],
1279 following_address: data["following"],
1280 bio: data["summary"] || "",
1281 actor_type: actor_type,
1282 also_known_as: Map.get(data, "alsoKnownAs", []),
1283 public_key: public_key,
1284 inbox: data["inbox"],
1285 shared_inbox: shared_inbox,
1286 accepts_chat_messages: accepts_chat_messages
1289 # nickname can be nil because of virtual actors
1290 if data["preferredUsername"] do
1294 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1297 Map.put(user_data, :nickname, nil)
1301 def fetch_follow_information_for_user(user) do
1302 with {:ok, following_data} <-
1303 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1304 {:ok, hide_follows} <- collection_private(following_data),
1305 {:ok, followers_data} <-
1306 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1307 {:ok, hide_followers} <- collection_private(followers_data) do
1310 hide_follows: hide_follows,
1311 follower_count: normalize_counter(followers_data["totalItems"]),
1312 following_count: normalize_counter(following_data["totalItems"]),
1313 hide_followers: hide_followers
1316 {:error, _} = e -> e
1321 defp normalize_counter(counter) when is_integer(counter), do: counter
1322 defp normalize_counter(_), do: 0
1324 def maybe_update_follow_information(user_data) do
1325 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1326 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1328 {:collections_available,
1329 !!(user_data[:following_address] && user_data[:follower_address])},
1331 fetch_follow_information_for_user(user_data) do
1332 info = Map.merge(user_data[:info] || %{}, info)
1335 |> Map.put(:info, info)
1337 {:user_type_check, false} ->
1340 {:collections_available, false} ->
1343 {:enabled, false} ->
1348 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1355 defp collection_private(%{"first" => %{"type" => type}})
1356 when type in ["CollectionPage", "OrderedCollectionPage"],
1359 defp collection_private(%{"first" => first}) do
1360 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1361 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1364 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1365 {:error, _} = e -> e
1370 defp collection_private(_data), do: {:ok, true}
1372 def user_data_from_user_object(data) do
1373 with {:ok, data} <- MRF.filter(data) do
1374 {:ok, object_to_user_data(data)}
1380 def fetch_and_prepare_user_from_ap_id(ap_id) do
1381 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1382 {:ok, data} <- user_data_from_user_object(data) do
1383 {:ok, maybe_update_follow_information(data)}
1385 # If this has been deleted, only log a debug and not an error
1386 {:error, "Object has been deleted" = e} ->
1387 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1390 {:error, {:reject, reason} = e} ->
1391 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1395 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1400 def maybe_handle_clashing_nickname(data) do
1401 with nickname when is_binary(nickname) <- data[:nickname],
1402 %User{} = old_user <- User.get_by_nickname(nickname),
1403 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1405 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1411 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1412 |> User.update_and_set_cache()
1414 {:ap_id_comparison, true} ->
1416 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1424 def make_user_from_ap_id(ap_id) do
1425 user = User.get_cached_by_ap_id(ap_id)
1427 if user && !User.ap_enabled?(user) do
1428 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1430 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1433 |> User.remote_user_changeset(data)
1434 |> User.update_and_set_cache()
1436 maybe_handle_clashing_nickname(data)
1439 |> User.remote_user_changeset()
1447 def make_user_from_nickname(nickname) do
1448 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1449 make_user_from_ap_id(ap_id)
1451 _e -> {:error, "No AP id in WebFinger"}
1455 # filter out broken threads
1456 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1457 entire_thread_visible_for_user?(activity, user)
1460 # do post-processing on a specific activity
1461 def contain_activity(%Activity{} = activity, %User{} = user) do
1462 contain_broken_threads(activity, user)
1465 def fetch_direct_messages_query do
1467 |> restrict_type(%{type: "Create"})
1468 |> restrict_visibility(%{visibility: "direct"})
1469 |> order_by([activity], asc: activity.id)