1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
14 alias Pleroma.Notification
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
33 require Pleroma.Constants
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
52 defp check_actor_is_active(nil), do: true
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
66 defp check_remote_limit(_), do: true
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
85 defp increase_replies_count_if_reply(_create_data), do: :noop
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
99 Repo.insert(%Activity{
102 recipients: recipients,
103 actor: object["actor"]
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
126 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
127 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 %Activity{} = activity ->
138 {:containment, _} = error ->
141 {:error, _} = error ->
144 {:fake, true, map, recipients} ->
145 activity = %Activity{
149 recipients: recipients,
153 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
156 {:remote_limit_pass, _} ->
157 {:error, :remote_limit}
164 defp insert_activity_with_expiration(data, local, recipients) do
168 actor: data["actor"],
169 recipients: recipients
172 with {:ok, activity} <- Repo.insert(struct) do
173 maybe_create_activity_expiration(activity)
177 def notify_and_stream(activity) do
178 Notification.create_notifications(activity)
180 conversation = create_or_bump_conversation(activity, activity.actor)
181 participations = get_participations(conversation)
183 stream_out_participations(participations)
186 defp maybe_create_activity_expiration(
187 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
190 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
191 activity_id: activity.id,
192 expires_at: expires_at
198 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
200 defp create_or_bump_conversation(activity, actor) do
201 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
202 %User{} = user <- User.get_cached_by_ap_id(actor) do
203 Participation.mark_as_read(user, conversation)
208 defp get_participations({:ok, conversation}) do
210 |> Repo.preload(:participations, force: true)
211 |> Map.get(:participations)
214 defp get_participations(_), do: []
216 def stream_out_participations(participations) do
219 |> Repo.preload(:user)
221 Streamer.stream("participation", participations)
224 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
225 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
226 conversation = Repo.preload(conversation, :participations)
229 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
234 if last_activity_id do
235 stream_out_participations(conversation.participations)
240 def stream_out_participations(_, _), do: :noop
242 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
243 when data_type in ["Create", "Announce", "Delete"] do
245 |> Topics.get_activity_topics()
246 |> Streamer.stream(activity)
249 def stream_out(_activity) do
253 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
254 def create(params, fake \\ false) do
255 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
260 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
261 additional = params[:additional] || %{}
262 # only accept false as false value
263 local = !(params[:local] == false)
264 published = params[:published]
265 quick_insert? = Config.get([:env]) == :benchmark
269 %{to: to, actor: actor, published: published, context: context, object: object},
273 with {:ok, activity} <- insert(create_data, local, fake),
274 {:fake, false, activity} <- {:fake, fake, activity},
275 _ <- increase_replies_count_if_reply(create_data),
276 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
277 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
278 _ <- notify_and_stream(activity),
279 :ok <- maybe_federate(activity) do
282 {:quick_insert, true, activity} ->
285 {:fake, true, activity} ->
289 Repo.rollback(message)
293 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
294 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
295 additional = params[:additional] || %{}
296 # only accept false as false value
297 local = !(params[:local] == false)
298 published = params[:published]
302 %{to: to, actor: actor, published: published, context: context, object: object},
306 with {:ok, activity} <- insert(listen_data, local),
307 _ <- notify_and_stream(activity),
308 :ok <- maybe_federate(activity) do
313 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
314 {:ok, Activity.t()} | nil | {:error, any()}
315 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
316 with {:ok, result} <-
317 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
322 defp do_unfollow(follower, followed, activity_id, local) do
323 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
324 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
325 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
326 {:ok, activity} <- insert(unfollow_data, local),
327 _ <- notify_and_stream(activity),
328 :ok <- maybe_federate(activity) do
332 {:error, error} -> Repo.rollback(error)
336 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
338 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
352 # only accept false as false value
353 local = !(params[:local] == false)
354 forward = !(params[:forward] == false)
356 additional = params[:additional] || %{}
360 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
362 Map.merge(additional, %{"to" => [], "cc" => []})
365 with flag_data <- make_flag_data(params, additional),
366 {:ok, activity} <- insert(flag_data, local),
367 {:ok, stripped_activity} <- strip_report_status_data(activity),
368 _ <- notify_and_stream(activity),
370 maybe_federate(stripped_activity) do
371 User.all_superusers()
372 |> Enum.filter(fn user -> not is_nil(user.email) end)
373 |> Enum.each(fn superuser ->
375 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
376 |> Pleroma.Emails.Mailer.deliver_async()
381 {:error, error} -> Repo.rollback(error)
385 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
386 def move(%User{} = origin, %User{} = target, local \\ true) do
389 "actor" => origin.ap_id,
390 "object" => origin.ap_id,
391 "target" => target.ap_id
394 with true <- origin.ap_id in target.also_known_as,
395 {:ok, activity} <- insert(params, local),
396 _ <- notify_and_stream(activity) do
397 maybe_federate(activity)
399 BackgroundWorker.enqueue("move_following", %{
400 "origin_id" => origin.id,
401 "target_id" => target.id
406 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
411 def fetch_activities_for_context_query(context, opts) do
412 public = [Constants.as_public()]
416 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
419 from(activity in Activity)
420 |> maybe_preload_objects(opts)
421 |> maybe_preload_bookmarks(opts)
422 |> maybe_set_thread_muted_field(opts)
423 |> restrict_blocked(opts)
424 |> restrict_recipients(recipients, opts[:user])
425 |> restrict_filtered(opts)
429 "?->>'type' = ? and ?->>'context' = ?",
436 |> exclude_poll_votes(opts)
438 |> order_by([activity], desc: activity.id)
441 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
442 def fetch_activities_for_context(context, opts \\ %{}) do
444 |> fetch_activities_for_context_query(opts)
448 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
449 FlakeId.Ecto.CompatType.t() | nil
450 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
452 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
453 |> restrict_visibility(%{visibility: "direct"})
459 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
460 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
461 opts = Map.delete(opts, :user)
463 [Constants.as_public()]
464 |> fetch_activities_query(opts)
465 |> restrict_unlisted(opts)
466 |> Pagination.fetch_paginated(opts, pagination)
469 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
470 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
472 |> Map.put(:restrict_unlisted, true)
473 |> fetch_public_or_unlisted_activities(pagination)
476 @valid_visibilities ~w[direct unlisted public private]
478 defp restrict_visibility(query, %{visibility: visibility})
479 when is_list(visibility) do
480 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
485 "activity_visibility(?, ?, ?) = ANY (?)",
493 Logger.error("Could not restrict visibility to #{visibility}")
497 defp restrict_visibility(query, %{visibility: visibility})
498 when visibility in @valid_visibilities do
502 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 defp restrict_visibility(_query, %{visibility: visibility})
507 when visibility not in @valid_visibilities do
508 Logger.error("Could not restrict visibility to #{visibility}")
511 defp restrict_visibility(query, _visibility), do: query
513 defp exclude_visibility(query, %{exclude_visibilities: visibility})
514 when is_list(visibility) do
515 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
520 "activity_visibility(?, ?, ?) = ANY (?)",
528 Logger.error("Could not exclude visibility to #{visibility}")
533 defp exclude_visibility(query, %{exclude_visibilities: visibility})
534 when visibility in @valid_visibilities do
539 "activity_visibility(?, ?, ?) = ?",
548 defp exclude_visibility(query, %{exclude_visibilities: visibility})
549 when visibility not in [nil | @valid_visibilities] do
550 Logger.error("Could not exclude visibility to #{visibility}")
554 defp exclude_visibility(query, _visibility), do: query
556 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
559 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
562 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
565 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 defp restrict_thread_visibility(query, _, _), do: query
571 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
574 |> Map.put(:user, reading_user)
575 |> Map.put(:actor_id, user.ap_id)
578 godmode: params[:godmode],
579 reading_user: reading_user
581 |> user_activities_recipients()
582 |> fetch_activities(params)
586 def fetch_user_activities(user, reading_user, params \\ %{}) do
589 |> Map.put(:type, ["Create", "Announce"])
590 |> Map.put(:user, reading_user)
591 |> Map.put(:actor_id, user.ap_id)
592 |> Map.put(:pinned_activity_ids, user.pinned_activities)
595 if User.blocks?(reading_user, user) do
599 |> Map.put(:blocking_user, reading_user)
600 |> Map.put(:muting_user, reading_user)
604 godmode: params[:godmode],
605 reading_user: reading_user
607 |> user_activities_recipients()
608 |> fetch_activities(params)
612 def fetch_statuses(reading_user, params) do
613 params = Map.put(params, :type, ["Create", "Announce"])
616 godmode: params[:godmode],
617 reading_user: reading_user
619 |> user_activities_recipients()
620 |> fetch_activities(params, :offset)
624 defp user_activities_recipients(%{godmode: true}), do: []
626 defp user_activities_recipients(%{reading_user: reading_user}) do
628 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
630 [Constants.as_public()]
634 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
635 raise "Can't use the child object without preloading!"
638 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
640 [activity, object] in query,
643 "?->>'type' != ? or ?->>'actor' != ?",
652 defp restrict_announce_object_actor(query, _), do: query
654 defp restrict_since(query, %{since_id: ""}), do: query
656 defp restrict_since(query, %{since_id: since_id}) do
657 from(activity in query, where: activity.id > ^since_id)
660 defp restrict_since(query, _), do: query
662 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
663 raise_on_missing_preload()
666 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
668 [_activity, object] in query,
669 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
674 restrict_tag_reject(query, %{tag_reject: [tag_reject]})
677 defp restrict_tag_reject(query, _), do: query
679 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
680 raise_on_missing_preload()
683 defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
685 [_activity, object] in query,
686 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
690 defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
691 restrict_tag(query, %{tag: tag})
694 defp restrict_tag_all(query, _), do: query
696 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
697 raise_on_missing_preload()
700 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
702 [_activity, object] in query,
703 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
707 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
708 restrict_tag(query, %{tag: [tag]})
711 defp restrict_tag(query, _), do: query
713 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
714 raise_on_missing_preload()
717 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
718 if has_named_binding?(query, :thread_mute) do
720 [activity, object, thread_mute] in query,
721 group_by: [activity.id, object.id, thread_mute.id]
725 [activity, object] in query,
726 group_by: [activity.id, object.id]
729 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
732 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
736 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
737 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
740 defp restrict_hashtag_reject_any(query, _), do: query
742 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
743 raise_on_missing_preload()
746 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
750 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
754 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
755 restrict_hashtag_any(query, %{tag: tag})
758 defp restrict_hashtag_all(query, _), do: query
760 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
761 raise_on_missing_preload()
764 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
766 [_activity, object] in query,
767 join: hashtag in assoc(object, :hashtags),
768 where: hashtag.name in ^tags
772 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
773 restrict_hashtag_any(query, %{tag: [tag]})
776 defp restrict_hashtag_any(query, _), do: query
778 defp raise_on_missing_preload do
779 raise "Can't use the child object without preloading!"
782 defp restrict_recipients(query, [], _user), do: query
784 defp restrict_recipients(query, recipients, nil) do
785 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
788 defp restrict_recipients(query, recipients, user) do
791 where: fragment("? && ?", ^recipients, activity.recipients),
792 or_where: activity.actor == ^user.ap_id
796 defp restrict_local(query, %{local_only: true}) do
797 from(activity in query, where: activity.local == true)
800 defp restrict_local(query, _), do: query
802 defp restrict_actor(query, %{actor_id: actor_id}) do
803 from(activity in query, where: activity.actor == ^actor_id)
806 defp restrict_actor(query, _), do: query
808 defp restrict_type(query, %{type: type}) when is_binary(type) do
809 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
812 defp restrict_type(query, %{type: type}) do
813 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
816 defp restrict_type(query, _), do: query
818 defp restrict_state(query, %{state: state}) do
819 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
822 defp restrict_state(query, _), do: query
824 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
826 [_activity, object] in query,
827 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
831 defp restrict_favorited_by(query, _), do: query
833 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
834 raise "Can't use the child object without preloading!"
837 defp restrict_media(query, %{only_media: true}) do
839 [activity, object] in query,
840 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
841 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
845 defp restrict_media(query, _), do: query
847 defp restrict_replies(query, %{exclude_replies: true}) do
849 [_activity, object] in query,
850 where: fragment("?->>'inReplyTo' is null", object.data)
854 defp restrict_replies(query, %{
855 reply_filtering_user: %User{} = user,
856 reply_visibility: "self"
859 [activity, object] in query,
862 "?->>'inReplyTo' is null OR ? = ANY(?)",
870 defp restrict_replies(query, %{
871 reply_filtering_user: %User{} = user,
872 reply_visibility: "following"
875 [activity, object] in query,
879 ?->>'type' != 'Create' -- This isn't a Create
880 OR ?->>'inReplyTo' is null -- this isn't a reply
881 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
882 -- unless they are the author (because authors
883 -- are also part of the recipients). This leads
884 -- to a bug that self-replies by friends won't
886 OR ? = ? -- The actor is us
890 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
899 defp restrict_replies(query, _), do: query
901 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
902 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
905 defp restrict_reblogs(query, _), do: query
907 defp restrict_muted(query, %{with_muted: true}), do: query
909 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
910 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
913 from([activity] in query,
914 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
917 "not (?->'to' \\?| ?) or ? = ?",
925 unless opts[:skip_preload] do
926 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
932 defp restrict_muted(query, _), do: query
934 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
935 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
936 domain_blocks = user.domain_blocks || []
938 following_ap_ids = User.get_friends_ap_ids(user)
941 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
944 [activity, object: o] in query,
945 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
948 "((not (? && ?)) or ? = ?)",
956 "recipients_contain_blocked_domains(?, ?) = false",
962 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
969 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
977 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
986 defp restrict_blocked(query, _), do: query
988 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
993 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
995 ^[Constants.as_public()]
1000 defp restrict_unlisted(query, _), do: query
1002 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1003 from(activity in query, where: activity.id in ^ids)
1006 defp restrict_pinned(query, _), do: query
1008 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1009 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1015 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1023 defp restrict_muted_reblogs(query, _), do: query
1025 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1028 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1032 defp restrict_instance(query, _), do: query
1034 defp restrict_filtered(query, %{user: %User{} = user}) do
1035 case Filter.compose_regex(user) do
1040 from([activity, object] in query,
1042 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1043 activity.actor == ^user.ap_id
1048 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1049 restrict_filtered(query, %{user: user})
1052 defp restrict_filtered(query, _), do: query
1054 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1056 defp exclude_poll_votes(query, _) do
1057 if has_named_binding?(query, :object) do
1058 from([activity, object: o] in query,
1059 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1066 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1068 defp exclude_chat_messages(query, _) do
1069 if has_named_binding?(query, :object) do
1070 from([activity, object: o] in query,
1071 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1078 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1080 defp exclude_invisible_actors(query, _opts) do
1082 User.Query.build(%{invisible: true, select: [:ap_id]})
1084 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1086 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1089 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1090 from(activity in query, where: activity.id != ^id)
1093 defp exclude_id(query, _), do: query
1095 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1097 defp maybe_preload_objects(query, _) do
1099 |> Activity.with_preloaded_object()
1102 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1104 defp maybe_preload_bookmarks(query, opts) do
1106 |> Activity.with_preloaded_bookmark(opts[:user])
1109 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1111 |> Activity.with_preloaded_report_notes()
1114 defp maybe_preload_report_notes(query, _), do: query
1116 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1118 defp maybe_set_thread_muted_field(query, opts) do
1120 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1123 defp maybe_order(query, %{order: :desc}) do
1125 |> order_by(desc: :id)
1128 defp maybe_order(query, %{order: :asc}) do
1130 |> order_by(asc: :id)
1133 defp maybe_order(query, _), do: query
1135 defp fetch_activities_query_ap_ids_ops(opts) do
1136 source_user = opts[:muting_user]
1137 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1139 ap_id_relationships =
1140 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1141 [:block | ap_id_relationships]
1146 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1148 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1149 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1151 restrict_muted_reblogs_opts =
1152 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1154 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1157 def fetch_activities_query(recipients, opts \\ %{}) do
1158 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1159 fetch_activities_query_ap_ids_ops(opts)
1162 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1167 |> distinct([a], true)
1168 |> maybe_preload_objects(opts)
1169 |> maybe_preload_bookmarks(opts)
1170 |> maybe_preload_report_notes(opts)
1171 |> maybe_set_thread_muted_field(opts)
1172 |> maybe_order(opts)
1173 |> restrict_recipients(recipients, opts[:user])
1174 |> restrict_replies(opts)
1175 |> restrict_since(opts)
1176 |> restrict_local(opts)
1177 |> restrict_actor(opts)
1178 |> restrict_type(opts)
1179 |> restrict_state(opts)
1180 |> restrict_favorited_by(opts)
1181 |> restrict_blocked(restrict_blocked_opts)
1182 |> restrict_muted(restrict_muted_opts)
1183 |> restrict_filtered(opts)
1184 |> restrict_media(opts)
1185 |> restrict_visibility(opts)
1186 |> restrict_thread_visibility(opts, config)
1187 |> restrict_reblogs(opts)
1188 |> restrict_pinned(opts)
1189 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1190 |> restrict_instance(opts)
1191 |> restrict_announce_object_actor(opts)
1192 |> restrict_filtered(opts)
1193 |> Activity.restrict_deactivated_users()
1194 |> exclude_poll_votes(opts)
1195 |> exclude_chat_messages(opts)
1196 |> exclude_invisible_actors(opts)
1197 |> exclude_visibility(opts)
1199 if Config.get([:instance, :improved_hashtag_timeline]) do
1201 |> restrict_hashtag_any(opts)
1202 |> restrict_hashtag_all(opts)
1203 |> restrict_hashtag_reject_any(opts)
1206 |> restrict_tag(opts)
1207 |> restrict_tag_reject(opts)
1208 |> restrict_tag_all(opts)
1212 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1213 list_memberships = Pleroma.List.memberships(opts[:user])
1215 fetch_activities_query(recipients ++ list_memberships, opts)
1216 |> Pagination.fetch_paginated(opts, pagination)
1218 |> maybe_update_cc(list_memberships, opts[:user])
1222 Fetch favorites activities of user with order by sort adds to favorites
1224 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1225 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1227 |> Activity.Queries.by_actor()
1228 |> Activity.Queries.by_type("Like")
1229 |> Activity.with_joined_object()
1230 |> Object.with_joined_activity()
1231 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1232 |> order_by([like, _, _], desc_nulls_last: like.id)
1233 |> Pagination.fetch_paginated(
1234 Map.merge(params, %{skip_order: true}),
1239 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1240 Enum.map(activities, fn
1241 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1242 if Enum.any?(bcc, &(&1 in list_memberships)) do
1243 update_in(activity.data["cc"], &[user_ap_id | &1])
1253 defp maybe_update_cc(activities, _, _), do: activities
1255 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1256 from(activity in query,
1258 fragment("? && ?", activity.recipients, ^recipients) or
1259 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1260 ^Constants.as_public() in activity.recipients)
1264 def fetch_activities_bounded(
1266 recipients_with_public,
1268 pagination \\ :keyset
1270 fetch_activities_query([], opts)
1271 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1272 |> Pagination.fetch_paginated(opts, pagination)
1276 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1277 def upload(file, opts \\ []) do
1278 with {:ok, data} <- Upload.store(file, opts) do
1279 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1281 Repo.insert(%Object{data: obj_data})
1285 @spec get_actor_url(any()) :: binary() | nil
1286 defp get_actor_url(url) when is_binary(url), do: url
1287 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1289 defp get_actor_url(url) when is_list(url) do
1295 defp get_actor_url(_url), do: nil
1297 defp object_to_user_data(data) do
1299 data["icon"]["url"] &&
1302 "url" => [%{"href" => data["icon"]["url"]}]
1306 data["image"]["url"] &&
1309 "url" => [%{"href" => data["image"]["url"]}]
1314 |> Map.get("attachment", [])
1315 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1316 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1320 |> Map.get("tag", [])
1322 %{"type" => "Emoji"} -> true
1325 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1326 {String.trim(name, ":"), url}
1329 is_locked = data["manuallyApprovesFollowers"] || false
1330 capabilities = data["capabilities"] || %{}
1331 accepts_chat_messages = capabilities["acceptsChatMessages"]
1332 data = Transmogrifier.maybe_fix_user_object(data)
1333 is_discoverable = data["discoverable"] || false
1334 invisible = data["invisible"] || false
1335 actor_type = data["type"] || "Person"
1338 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1339 data["publicKey"]["publicKeyPem"]
1345 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1346 data["endpoints"]["sharedInbox"]
1353 uri: get_actor_url(data["url"]),
1358 is_locked: is_locked,
1359 is_discoverable: is_discoverable,
1360 invisible: invisible,
1363 follower_address: data["followers"],
1364 following_address: data["following"],
1365 bio: data["summary"] || "",
1366 actor_type: actor_type,
1367 also_known_as: Map.get(data, "alsoKnownAs", []),
1368 public_key: public_key,
1369 inbox: data["inbox"],
1370 shared_inbox: shared_inbox,
1371 accepts_chat_messages: accepts_chat_messages
1374 # nickname can be nil because of virtual actors
1375 if data["preferredUsername"] do
1379 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1382 Map.put(user_data, :nickname, nil)
1386 def fetch_follow_information_for_user(user) do
1387 with {:ok, following_data} <-
1388 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1389 {:ok, hide_follows} <- collection_private(following_data),
1390 {:ok, followers_data} <-
1391 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1392 {:ok, hide_followers} <- collection_private(followers_data) do
1395 hide_follows: hide_follows,
1396 follower_count: normalize_counter(followers_data["totalItems"]),
1397 following_count: normalize_counter(following_data["totalItems"]),
1398 hide_followers: hide_followers
1401 {:error, _} = e -> e
1406 defp normalize_counter(counter) when is_integer(counter), do: counter
1407 defp normalize_counter(_), do: 0
1409 def maybe_update_follow_information(user_data) do
1410 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1411 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1413 {:collections_available,
1414 !!(user_data[:following_address] && user_data[:follower_address])},
1416 fetch_follow_information_for_user(user_data) do
1417 info = Map.merge(user_data[:info] || %{}, info)
1420 |> Map.put(:info, info)
1422 {:user_type_check, false} ->
1425 {:collections_available, false} ->
1428 {:enabled, false} ->
1433 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1440 defp collection_private(%{"first" => %{"type" => type}})
1441 when type in ["CollectionPage", "OrderedCollectionPage"],
1444 defp collection_private(%{"first" => first}) do
1445 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1446 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1449 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1450 {:error, _} = e -> e
1455 defp collection_private(_data), do: {:ok, true}
1457 def user_data_from_user_object(data) do
1458 with {:ok, data} <- MRF.filter(data) do
1459 {:ok, object_to_user_data(data)}
1465 def fetch_and_prepare_user_from_ap_id(ap_id) do
1466 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1467 {:ok, data} <- user_data_from_user_object(data) do
1468 {:ok, maybe_update_follow_information(data)}
1470 # If this has been deleted, only log a debug and not an error
1471 {:error, "Object has been deleted" = e} ->
1472 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1475 {:error, {:reject, reason} = e} ->
1476 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1480 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1485 def maybe_handle_clashing_nickname(data) do
1486 with nickname when is_binary(nickname) <- data[:nickname],
1487 %User{} = old_user <- User.get_by_nickname(nickname),
1488 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1490 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1496 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1497 |> User.update_and_set_cache()
1499 {:ap_id_comparison, true} ->
1501 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1509 def make_user_from_ap_id(ap_id) do
1510 user = User.get_cached_by_ap_id(ap_id)
1512 if user && !User.ap_enabled?(user) do
1513 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1515 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1518 |> User.remote_user_changeset(data)
1519 |> User.update_and_set_cache()
1521 maybe_handle_clashing_nickname(data)
1524 |> User.remote_user_changeset()
1532 def make_user_from_nickname(nickname) do
1533 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1534 make_user_from_ap_id(ap_id)
1536 _e -> {:error, "No AP id in WebFinger"}
1540 # filter out broken threads
1541 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1542 entire_thread_visible_for_user?(activity, user)
1545 # do post-processing on a specific activity
1546 def contain_activity(%Activity{} = activity, %User{} = user) do
1547 contain_broken_threads(activity, user)
1550 def fetch_direct_messages_query do
1552 |> restrict_type(%{type: "Create"})
1553 |> restrict_visibility(%{visibility: "direct"})
1554 |> order_by([activity], asc: activity.id)