1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
126 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
130 %Activity{} = activity ->
136 {:containment, _} = error ->
139 {:error, _} = error ->
142 {:fake, true, map, recipients} ->
143 activity = %Activity{
147 recipients: recipients,
151 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
154 {:remote_limit_pass, _} ->
155 {:error, :remote_limit}
162 defp insert_activity_with_expiration(data, local, recipients) do
166 actor: data["actor"],
167 recipients: recipients
170 with {:ok, activity} <- Repo.insert(struct) do
171 maybe_create_activity_expiration(activity)
175 def notify_and_stream(activity) do
176 Notification.create_notifications(activity)
178 conversation = create_or_bump_conversation(activity, activity.actor)
179 participations = get_participations(conversation)
181 stream_out_participations(participations)
184 defp maybe_create_activity_expiration(
185 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
188 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
189 activity_id: activity.id,
190 expires_at: expires_at
196 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
198 defp create_or_bump_conversation(activity, actor) do
199 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
200 %User{} = user <- User.get_cached_by_ap_id(actor) do
201 Participation.mark_as_read(user, conversation)
206 defp get_participations({:ok, conversation}) do
208 |> Repo.preload(:participations, force: true)
209 |> Map.get(:participations)
212 defp get_participations(_), do: []
214 def stream_out_participations(participations) do
217 |> Repo.preload(:user)
219 Streamer.stream("participation", participations)
222 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
223 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
224 conversation = Repo.preload(conversation, :participations)
227 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
232 if last_activity_id do
233 stream_out_participations(conversation.participations)
238 def stream_out_participations(_, _), do: :noop
240 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
241 when data_type in ["Create", "Announce", "Delete"] do
243 |> Topics.get_activity_topics()
244 |> Streamer.stream(activity)
247 def stream_out(_activity) do
251 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
252 def create(params, fake \\ false) do
253 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
258 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
259 additional = params[:additional] || %{}
260 # only accept false as false value
261 local = !(params[:local] == false)
262 published = params[:published]
263 quick_insert? = Config.get([:env]) == :benchmark
267 %{to: to, actor: actor, published: published, context: context, object: object},
271 with {:ok, activity} <- insert(create_data, local, fake),
272 {:fake, false, activity} <- {:fake, fake, activity},
273 _ <- increase_replies_count_if_reply(create_data),
274 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
275 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
276 _ <- notify_and_stream(activity),
277 :ok <- maybe_federate(activity) do
280 {:quick_insert, true, activity} ->
283 {:fake, true, activity} ->
287 Repo.rollback(message)
291 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
292 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
293 additional = params[:additional] || %{}
294 # only accept false as false value
295 local = !(params[:local] == false)
296 published = params[:published]
300 %{to: to, actor: actor, published: published, context: context, object: object},
304 with {:ok, activity} <- insert(listen_data, local),
305 _ <- notify_and_stream(activity),
306 :ok <- maybe_federate(activity) do
311 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
312 {:ok, Activity.t()} | nil | {:error, any()}
313 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
314 with {:ok, result} <-
315 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
320 defp do_unfollow(follower, followed, activity_id, local) do
321 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
322 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
323 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
324 {:ok, activity} <- insert(unfollow_data, local),
325 _ <- notify_and_stream(activity),
326 :ok <- maybe_federate(activity) do
330 {:error, error} -> Repo.rollback(error)
334 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
336 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
350 # only accept false as false value
351 local = !(params[:local] == false)
352 forward = !(params[:forward] == false)
354 additional = params[:additional] || %{}
358 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
360 Map.merge(additional, %{"to" => [], "cc" => []})
363 with flag_data <- make_flag_data(params, additional),
364 {:ok, activity} <- insert(flag_data, local),
365 {:ok, stripped_activity} <- strip_report_status_data(activity),
366 _ <- notify_and_stream(activity),
368 maybe_federate(stripped_activity) do
369 User.all_superusers()
370 |> Enum.filter(fn user -> not is_nil(user.email) end)
371 |> Enum.each(fn superuser ->
373 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
374 |> Pleroma.Emails.Mailer.deliver_async()
379 {:error, error} -> Repo.rollback(error)
383 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
384 def move(%User{} = origin, %User{} = target, local \\ true) do
387 "actor" => origin.ap_id,
388 "object" => origin.ap_id,
389 "target" => target.ap_id
392 with true <- origin.ap_id in target.also_known_as,
393 {:ok, activity} <- insert(params, local),
394 _ <- notify_and_stream(activity) do
395 maybe_federate(activity)
397 BackgroundWorker.enqueue("move_following", %{
398 "origin_id" => origin.id,
399 "target_id" => target.id
404 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
409 def fetch_activities_for_context_query(context, opts) do
410 public = [Constants.as_public()]
414 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
417 from(activity in Activity)
418 |> maybe_preload_objects(opts)
419 |> maybe_preload_bookmarks(opts)
420 |> maybe_set_thread_muted_field(opts)
421 |> restrict_blocked(opts)
422 |> restrict_recipients(recipients, opts[:user])
423 |> restrict_filtered(opts)
427 "?->>'type' = ? and ?->>'context' = ?",
434 |> exclude_poll_votes(opts)
436 |> order_by([activity], desc: activity.id)
439 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
440 def fetch_activities_for_context(context, opts \\ %{}) do
442 |> fetch_activities_for_context_query(opts)
446 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
447 FlakeId.Ecto.CompatType.t() | nil
448 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
450 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
451 |> restrict_visibility(%{visibility: "direct"})
457 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
458 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
459 opts = Map.delete(opts, :user)
461 [Constants.as_public()]
462 |> fetch_activities_query(opts)
463 |> restrict_unlisted(opts)
464 |> Pagination.fetch_paginated(opts, pagination)
467 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
470 |> Map.put(:restrict_unlisted, true)
471 |> fetch_public_or_unlisted_activities(pagination)
474 @valid_visibilities ~w[direct unlisted public private]
476 defp restrict_visibility(query, %{visibility: visibility})
477 when is_list(visibility) do
478 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
483 "activity_visibility(?, ?, ?) = ANY (?)",
491 Logger.error("Could not restrict visibility to #{visibility}")
495 defp restrict_visibility(query, %{visibility: visibility})
496 when visibility in @valid_visibilities do
500 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
504 defp restrict_visibility(_query, %{visibility: visibility})
505 when visibility not in @valid_visibilities do
506 Logger.error("Could not restrict visibility to #{visibility}")
509 defp restrict_visibility(query, _visibility), do: query
511 defp exclude_visibility(query, %{exclude_visibilities: visibility})
512 when is_list(visibility) do
513 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
518 "activity_visibility(?, ?, ?) = ANY (?)",
526 Logger.error("Could not exclude visibility to #{visibility}")
531 defp exclude_visibility(query, %{exclude_visibilities: visibility})
532 when visibility in @valid_visibilities do
537 "activity_visibility(?, ?, ?) = ?",
546 defp exclude_visibility(query, %{exclude_visibilities: visibility})
547 when visibility not in [nil | @valid_visibilities] do
548 Logger.error("Could not exclude visibility to #{visibility}")
552 defp exclude_visibility(query, _visibility), do: query
554 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
557 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
560 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
563 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
567 defp restrict_thread_visibility(query, _, _), do: query
569 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
572 |> Map.put(:user, reading_user)
573 |> Map.put(:actor_id, user.ap_id)
576 godmode: params[:godmode],
577 reading_user: reading_user
579 |> user_activities_recipients()
580 |> fetch_activities(params)
584 def fetch_user_activities(user, reading_user, params \\ %{}) do
587 |> Map.put(:type, ["Create", "Announce"])
588 |> Map.put(:user, reading_user)
589 |> Map.put(:actor_id, user.ap_id)
590 |> Map.put(:pinned_activity_ids, user.pinned_activities)
593 if User.blocks?(reading_user, user) do
597 |> Map.put(:blocking_user, reading_user)
598 |> Map.put(:muting_user, reading_user)
602 godmode: params[:godmode],
603 reading_user: reading_user
605 |> user_activities_recipients()
606 |> fetch_activities(params)
610 def fetch_statuses(reading_user, params) do
611 params = Map.put(params, :type, ["Create", "Announce"])
614 godmode: params[:godmode],
615 reading_user: reading_user
617 |> user_activities_recipients()
618 |> fetch_activities(params, :offset)
622 defp user_activities_recipients(%{godmode: true}), do: []
624 defp user_activities_recipients(%{reading_user: reading_user}) do
626 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
628 [Constants.as_public()]
632 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
633 raise "Can't use the child object without preloading!"
636 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
638 [activity, object] in query,
641 "?->>'type' != ? or ?->>'actor' != ?",
650 defp restrict_announce_object_actor(query, _), do: query
652 defp restrict_since(query, %{since_id: ""}), do: query
654 defp restrict_since(query, %{since_id: since_id}) do
655 from(activity in query, where: activity.id > ^since_id)
658 defp restrict_since(query, _), do: query
660 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
661 raise "Can't use the child object without preloading!"
664 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
666 [_activity, object] in query,
667 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
671 defp restrict_tag_reject(query, _), do: query
673 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
674 raise "Can't use the child object without preloading!"
677 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
679 [_activity, object] in query,
680 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
684 defp restrict_tag_all(query, _), do: query
686 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
690 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
692 [_activity, object] in query,
693 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
697 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
699 [_activity, object] in query,
700 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
704 defp restrict_tag(query, _), do: query
706 defp restrict_recipients(query, [], _user), do: query
708 defp restrict_recipients(query, recipients, nil) do
709 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
712 defp restrict_recipients(query, recipients, user) do
715 where: fragment("? && ?", ^recipients, activity.recipients),
716 or_where: activity.actor == ^user.ap_id
720 defp restrict_local(query, %{local_only: true}) do
721 from(activity in query, where: activity.local == true)
724 defp restrict_local(query, _), do: query
726 defp restrict_actor(query, %{actor_id: actor_id}) do
727 from(activity in query, where: activity.actor == ^actor_id)
730 defp restrict_actor(query, _), do: query
732 defp restrict_type(query, %{type: type}) when is_binary(type) do
733 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
736 defp restrict_type(query, %{type: type}) do
737 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
740 defp restrict_type(query, _), do: query
742 defp restrict_state(query, %{state: state}) do
743 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
746 defp restrict_state(query, _), do: query
748 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
750 [_activity, object] in query,
751 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
755 defp restrict_favorited_by(query, _), do: query
757 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
758 raise "Can't use the child object without preloading!"
761 defp restrict_media(query, %{only_media: true}) do
763 [activity, object] in query,
764 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
765 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
769 defp restrict_media(query, _), do: query
771 defp restrict_replies(query, %{exclude_replies: true}) do
773 [_activity, object] in query,
774 where: fragment("?->>'inReplyTo' is null", object.data)
778 defp restrict_replies(query, %{
779 reply_filtering_user: %User{} = user,
780 reply_visibility: "self"
783 [activity, object] in query,
786 "?->>'inReplyTo' is null OR ? = ANY(?)",
794 defp restrict_replies(query, %{
795 reply_filtering_user: %User{} = user,
796 reply_visibility: "following"
799 [activity, object] in query,
803 ?->>'type' != 'Create' -- This isn't a Create
804 OR ?->>'inReplyTo' is null -- this isn't a reply
805 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
806 -- unless they are the author (because authors
807 -- are also part of the recipients). This leads
808 -- to a bug that self-replies by friends won't
810 OR ? = ? -- The actor is us
814 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
823 defp restrict_replies(query, _), do: query
825 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
826 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
829 defp restrict_reblogs(query, _), do: query
831 defp restrict_muted(query, %{with_muted: true}), do: query
833 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
834 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
837 from([activity] in query,
838 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
839 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
842 unless opts[:skip_preload] do
843 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
849 defp restrict_muted(query, _), do: query
851 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
852 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
853 domain_blocks = user.domain_blocks || []
855 following_ap_ids = User.get_friends_ap_ids(user)
858 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
861 [activity, object: o] in query,
862 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
865 "((not (? && ?)) or ? = ?)",
873 "recipients_contain_blocked_domains(?, ?) = false",
879 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
886 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
894 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
903 defp restrict_blocked(query, _), do: query
905 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
910 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
912 ^[Constants.as_public()]
917 defp restrict_unlisted(query, _), do: query
919 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
920 from(activity in query, where: activity.id in ^ids)
923 defp restrict_pinned(query, _), do: query
925 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
926 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
932 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
940 defp restrict_muted_reblogs(query, _), do: query
942 defp restrict_instance(query, %{instance: instance}) do
947 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
951 from(activity in query, where: activity.actor in ^users)
954 defp restrict_instance(query, _), do: query
956 defp restrict_filtered(query, %{user: %User{} = user}) do
957 case Filter.compose_regex(user) do
962 from([activity, object] in query,
964 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
965 activity.actor == ^user.ap_id
970 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
971 restrict_filtered(query, %{user: user})
974 defp restrict_filtered(query, _), do: query
976 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
978 defp exclude_poll_votes(query, _) do
979 if has_named_binding?(query, :object) do
980 from([activity, object: o] in query,
981 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
988 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
990 defp exclude_chat_messages(query, _) do
991 if has_named_binding?(query, :object) do
992 from([activity, object: o] in query,
993 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1000 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1002 defp exclude_invisible_actors(query, _opts) do
1004 User.Query.build(%{invisible: true, select: [:ap_id]})
1006 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1008 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1011 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1012 from(activity in query, where: activity.id != ^id)
1015 defp exclude_id(query, _), do: query
1017 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1019 defp maybe_preload_objects(query, _) do
1021 |> Activity.with_preloaded_object()
1024 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1026 defp maybe_preload_bookmarks(query, opts) do
1028 |> Activity.with_preloaded_bookmark(opts[:user])
1031 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1033 |> Activity.with_preloaded_report_notes()
1036 defp maybe_preload_report_notes(query, _), do: query
1038 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1040 defp maybe_set_thread_muted_field(query, opts) do
1042 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1045 defp maybe_order(query, %{order: :desc}) do
1047 |> order_by(desc: :id)
1050 defp maybe_order(query, %{order: :asc}) do
1052 |> order_by(asc: :id)
1055 defp maybe_order(query, _), do: query
1057 defp fetch_activities_query_ap_ids_ops(opts) do
1058 source_user = opts[:muting_user]
1059 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1061 ap_id_relationships =
1062 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1063 [:block | ap_id_relationships]
1068 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1070 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1071 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1073 restrict_muted_reblogs_opts =
1074 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1076 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1079 def fetch_activities_query(recipients, opts \\ %{}) do
1080 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1081 fetch_activities_query_ap_ids_ops(opts)
1084 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1088 |> maybe_preload_objects(opts)
1089 |> maybe_preload_bookmarks(opts)
1090 |> maybe_preload_report_notes(opts)
1091 |> maybe_set_thread_muted_field(opts)
1092 |> maybe_order(opts)
1093 |> restrict_recipients(recipients, opts[:user])
1094 |> restrict_replies(opts)
1095 |> restrict_tag(opts)
1096 |> restrict_tag_reject(opts)
1097 |> restrict_tag_all(opts)
1098 |> restrict_since(opts)
1099 |> restrict_local(opts)
1100 |> restrict_actor(opts)
1101 |> restrict_type(opts)
1102 |> restrict_state(opts)
1103 |> restrict_favorited_by(opts)
1104 |> restrict_blocked(restrict_blocked_opts)
1105 |> restrict_muted(restrict_muted_opts)
1106 |> restrict_filtered(opts)
1107 |> restrict_media(opts)
1108 |> restrict_visibility(opts)
1109 |> restrict_thread_visibility(opts, config)
1110 |> restrict_reblogs(opts)
1111 |> restrict_pinned(opts)
1112 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1113 |> restrict_instance(opts)
1114 |> restrict_announce_object_actor(opts)
1115 |> restrict_filtered(opts)
1116 |> Activity.restrict_deactivated_users()
1117 |> exclude_poll_votes(opts)
1118 |> exclude_chat_messages(opts)
1119 |> exclude_invisible_actors(opts)
1120 |> exclude_visibility(opts)
1123 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1124 list_memberships = Pleroma.List.memberships(opts[:user])
1126 fetch_activities_query(recipients ++ list_memberships, opts)
1127 |> Pagination.fetch_paginated(opts, pagination)
1129 |> maybe_update_cc(list_memberships, opts[:user])
1133 Fetch favorites activities of user with order by sort adds to favorites
1135 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1136 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1138 |> Activity.Queries.by_actor()
1139 |> Activity.Queries.by_type("Like")
1140 |> Activity.with_joined_object()
1141 |> Object.with_joined_activity()
1142 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1143 |> order_by([like, _, _], desc_nulls_last: like.id)
1144 |> Pagination.fetch_paginated(
1145 Map.merge(params, %{skip_order: true}),
1150 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1151 Enum.map(activities, fn
1152 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1153 if Enum.any?(bcc, &(&1 in list_memberships)) do
1154 update_in(activity.data["cc"], &[user_ap_id | &1])
1164 defp maybe_update_cc(activities, _, _), do: activities
1166 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1167 from(activity in query,
1169 fragment("? && ?", activity.recipients, ^recipients) or
1170 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1171 ^Constants.as_public() in activity.recipients)
1175 def fetch_activities_bounded(
1177 recipients_with_public,
1179 pagination \\ :keyset
1181 fetch_activities_query([], opts)
1182 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1183 |> Pagination.fetch_paginated(opts, pagination)
1187 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1188 def upload(file, opts \\ []) do
1189 with {:ok, data} <- Upload.store(file, opts) do
1190 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1192 Repo.insert(%Object{data: obj_data})
1196 @spec get_actor_url(any()) :: binary() | nil
1197 defp get_actor_url(url) when is_binary(url), do: url
1198 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1200 defp get_actor_url(url) when is_list(url) do
1206 defp get_actor_url(_url), do: nil
1208 defp object_to_user_data(data) do
1210 data["icon"]["url"] &&
1213 "url" => [%{"href" => data["icon"]["url"]}]
1217 data["image"]["url"] &&
1220 "url" => [%{"href" => data["image"]["url"]}]
1225 |> Map.get("attachment", [])
1226 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1227 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1231 |> Map.get("tag", [])
1233 %{"type" => "Emoji"} -> true
1236 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1237 {String.trim(name, ":"), url}
1240 is_locked = data["manuallyApprovesFollowers"] || false
1241 capabilities = data["capabilities"] || %{}
1242 accepts_chat_messages = capabilities["acceptsChatMessages"]
1243 data = Transmogrifier.maybe_fix_user_object(data)
1244 discoverable = data["discoverable"] || false
1245 invisible = data["invisible"] || false
1246 actor_type = data["type"] || "Person"
1249 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1250 data["publicKey"]["publicKeyPem"]
1256 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1257 data["endpoints"]["sharedInbox"]
1264 uri: get_actor_url(data["url"]),
1269 is_locked: is_locked,
1270 discoverable: discoverable,
1271 invisible: invisible,
1274 follower_address: data["followers"],
1275 following_address: data["following"],
1276 bio: data["summary"] || "",
1277 actor_type: actor_type,
1278 also_known_as: Map.get(data, "alsoKnownAs", []),
1279 public_key: public_key,
1280 inbox: data["inbox"],
1281 shared_inbox: shared_inbox,
1282 accepts_chat_messages: accepts_chat_messages
1285 # nickname can be nil because of virtual actors
1286 if data["preferredUsername"] do
1290 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1293 Map.put(user_data, :nickname, nil)
1297 def fetch_follow_information_for_user(user) do
1298 with {:ok, following_data} <-
1299 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
1302 {:ok, hide_follows} <- collection_private(following_data),
1303 {:ok, followers_data} <-
1304 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
1305 {:ok, hide_followers} <- collection_private(followers_data) do
1308 hide_follows: hide_follows,
1309 follower_count: normalize_counter(followers_data["totalItems"]),
1310 following_count: normalize_counter(following_data["totalItems"]),
1311 hide_followers: hide_followers
1314 {:error, _} = e -> e
1319 defp normalize_counter(counter) when is_integer(counter), do: counter
1320 defp normalize_counter(_), do: 0
1322 def maybe_update_follow_information(user_data) do
1323 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1324 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1326 {:collections_available,
1327 !!(user_data[:following_address] && user_data[:follower_address])},
1329 fetch_follow_information_for_user(user_data) do
1330 info = Map.merge(user_data[:info] || %{}, info)
1333 |> Map.put(:info, info)
1335 {:user_type_check, false} ->
1338 {:collections_available, false} ->
1341 {:enabled, false} ->
1346 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1353 defp collection_private(%{"first" => %{"type" => type}})
1354 when type in ["CollectionPage", "OrderedCollectionPage"],
1357 defp collection_private(%{"first" => first}) do
1358 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1359 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1362 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1363 {:error, _} = e -> e
1368 defp collection_private(_data), do: {:ok, true}
1370 def user_data_from_user_object(data) do
1371 with {:ok, data} <- MRF.filter(data) do
1372 {:ok, object_to_user_data(data)}
1378 def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
1379 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
1380 {:ok, data} <- user_data_from_user_object(data) do
1381 {:ok, maybe_update_follow_information(data)}
1383 # If this has been deleted, only log a debug and not an error
1384 {:error, "Object has been deleted" = e} ->
1385 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1388 {:error, {:reject, reason} = e} ->
1389 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1393 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1398 def maybe_handle_clashing_nickname(data) do
1399 with nickname when is_binary(nickname) <- data[:nickname],
1400 %User{} = old_user <- User.get_by_nickname(nickname),
1401 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1403 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1409 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1410 |> User.update_and_set_cache()
1412 {:ap_id_comparison, true} ->
1414 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1422 def make_user_from_ap_id(ap_id, opts \\ []) do
1423 user = User.get_cached_by_ap_id(ap_id)
1425 if user && !User.ap_enabled?(user) do
1426 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1428 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
1431 |> User.remote_user_changeset(data)
1432 |> User.update_and_set_cache()
1434 maybe_handle_clashing_nickname(data)
1437 |> User.remote_user_changeset()
1445 def make_user_from_nickname(nickname) do
1446 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1447 make_user_from_ap_id(ap_id)
1449 _e -> {:error, "No AP id in WebFinger"}
1453 # filter out broken threads
1454 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1455 entire_thread_visible_for_user?(activity, user)
1458 # do post-processing on a specific activity
1459 def contain_activity(%Activity{} = activity, %User{} = user) do
1460 contain_broken_threads(activity, user)
1463 def fetch_direct_messages_query do
1465 |> restrict_type(%{type: "Create"})
1466 |> restrict_visibility(%{visibility: "direct"})
1467 |> order_by([activity], asc: activity.id)