1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
8 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
16 alias Pleroma.Notification
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
36 require Pleroma.Constants
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
68 defp check_actor_can_insert(_), do: true
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
75 defp check_remote_limit(_), do: true
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
98 defp increase_replies_count_if_reply(_create_data), do: :noop
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
108 @unpersisted_activity_types ~w[Undo Delete Remove Accept Reject]
110 def persist(%{"type" => type} = object, [local: false] = meta)
111 when type in @unpersisted_activity_types do
113 {recipients, _, _} = get_recipients(object)
115 unpersisted = %Activity{
118 recipients: recipients,
119 actor: object["actor"]
122 {:ok, unpersisted, meta}
126 def persist(object, meta) do
127 with local <- Keyword.fetch!(meta, :local),
128 {recipients, _, _} <- get_recipients(object),
130 Repo.insert(%Activity{
133 recipients: recipients,
134 actor: object["actor"]
136 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
137 {:ok, _} <- maybe_create_activity_expiration(activity) do
138 {:ok, activity, meta}
142 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
143 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
144 with nil <- Activity.normalize(map),
145 map <- lazy_put_activity_defaults(map, fake),
146 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
147 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
148 {:ok, map} <- MRF.filter(map),
149 {recipients, _, _} = get_recipients(map),
150 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
151 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
152 {:ok, map, object} <- insert_full_object(map),
153 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
154 # Splice in the child object if we have one.
155 activity = Maps.put_if_present(activity, :object, object)
157 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
158 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
161 # Add local posts to search index
162 if local, do: Pleroma.Search.add_to_index(activity)
166 %Activity{} = activity ->
172 {:containment, _} = error ->
175 {:error, _} = error ->
178 {:fake, true, map, recipients} ->
179 activity = %Activity{
183 recipients: recipients,
187 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
190 {:remote_limit_pass, _} ->
191 {:error, :remote_limit}
198 defp insert_activity_with_expiration(data, local, recipients) do
202 actor: data["actor"],
203 recipients: recipients
206 with {:ok, activity} <- Repo.insert(struct) do
207 maybe_create_activity_expiration(activity)
211 def notify_and_stream(activity) do
212 Notification.create_notifications(activity)
216 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
217 Activity.get_create_by_object_ap_id_with_object(id)
223 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
224 participations = get_participations(conversation)
226 stream_out_participations(participations)
229 defp maybe_create_activity_expiration(
230 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
233 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
234 activity_id: activity.id,
235 expires_at: expires_at
241 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
243 defp create_or_bump_conversation(activity, actor) do
244 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
245 %User{} = user <- User.get_cached_by_ap_id(actor) do
246 Participation.mark_as_read(user, conversation)
251 defp get_participations({:ok, conversation}) do
253 |> Repo.preload(:participations, force: true)
254 |> Map.get(:participations)
257 defp get_participations(_), do: []
259 def stream_out_participations(participations) do
262 |> Repo.preload(:user)
264 Streamer.stream("participation", participations)
268 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
269 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
270 conversation = Repo.preload(conversation, :participations)
273 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
278 if last_activity_id do
279 stream_out_participations(conversation.participations)
285 def stream_out_participations(_, _), do: :noop
288 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
289 when data_type in ["Create", "Announce", "Delete", "Update"] do
291 |> Topics.get_activity_topics()
292 |> Streamer.stream(activity)
296 def stream_out(_activity) do
300 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
301 def create(params, fake \\ false) do
302 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
307 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
308 additional = params[:additional] || %{}
309 # only accept false as false value
310 local = !(params[:local] == false)
311 published = params[:published]
312 quick_insert? = Config.get([:env]) == :benchmark
316 %{to: to, actor: actor, published: published, context: context, object: object},
320 with {:ok, activity} <- insert(create_data, local, fake),
321 {:fake, false, activity} <- {:fake, fake, activity},
322 _ <- increase_replies_count_if_reply(create_data),
323 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
324 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
325 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_schedule_poll_notifications(activity),
328 :ok <- maybe_federate(activity) do
331 {:quick_insert, true, activity} ->
334 {:fake, true, activity} ->
338 Repo.rollback(message)
342 defp maybe_schedule_poll_notifications(activity) do
343 PollWorker.schedule_poll_end(activity)
347 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | nil | {:error, any()}
349 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
356 defp do_unfollow(follower, followed, activity_id, local)
358 defp do_unfollow(follower, followed, activity_id, local) when local == true do
359 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 {:ok, activity} <- insert(unfollow_data, local),
362 {:ok, _activity} <- Repo.delete(follow_activity),
363 _ <- notify_and_stream(activity),
364 :ok <- maybe_federate(activity) do
368 {:error, error} -> Repo.rollback(error)
372 defp do_unfollow(follower, followed, activity_id, false) do
373 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
374 # uses deterministic ids for follows.
375 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
376 {:ok, _activity} <- Repo.delete(follow_activity),
377 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
378 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
379 _ <- notify_and_stream(unfollow_activity) do
380 {:ok, unfollow_activity}
383 {:error, error} -> Repo.rollback(error)
387 defp make_unfollow_activity(data, local) do
388 {recipients, _, _} = get_recipients(data)
393 actor: data["actor"],
394 recipients: recipients
398 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
400 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
414 # only accept false as false value
415 local = !(params[:local] == false)
416 forward = !(params[:forward] == false)
418 additional = params[:additional] || %{}
422 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
424 Map.merge(additional, %{"to" => [], "cc" => []})
427 with flag_data <- make_flag_data(params, additional),
428 {:ok, activity} <- insert(flag_data, local),
429 {:ok, stripped_activity} <- strip_report_status_data(activity),
430 _ <- notify_and_stream(activity),
432 maybe_federate(stripped_activity) do
433 User.all_superusers()
434 |> Enum.filter(fn user -> user.ap_id != actor end)
435 |> Enum.filter(fn user -> not is_nil(user.email) end)
436 |> Enum.each(fn superuser ->
438 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
439 |> Pleroma.Emails.Mailer.deliver_async()
444 {:error, error} -> Repo.rollback(error)
448 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
449 def move(%User{} = origin, %User{} = target, local \\ true) do
452 "actor" => origin.ap_id,
453 "object" => origin.ap_id,
454 "target" => target.ap_id,
455 "to" => [origin.follower_address]
458 with true <- origin.ap_id in target.also_known_as,
459 {:ok, activity} <- insert(params, local),
460 _ <- notify_and_stream(activity) do
461 maybe_federate(activity)
463 BackgroundWorker.enqueue("move_following", %{
464 "origin_id" => origin.id,
465 "target_id" => target.id
470 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
475 def fetch_activities_for_context_query(context, opts) do
476 public = [Constants.as_public()]
480 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
483 from(activity in Activity)
484 |> maybe_preload_objects(opts)
485 |> maybe_preload_bookmarks(opts)
486 |> maybe_set_thread_muted_field(opts)
487 |> restrict_blocked(opts)
488 |> restrict_blockers_visibility(opts)
489 |> restrict_recipients(recipients, opts[:user])
490 |> restrict_filtered(opts)
494 "?->>'type' = ? and ?->>'context' = ?",
501 |> exclude_poll_votes(opts)
503 |> order_by([activity], desc: activity.id)
506 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
507 def fetch_activities_for_context(context, opts \\ %{}) do
509 |> fetch_activities_for_context_query(opts)
513 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
514 FlakeId.Ecto.CompatType.t() | nil
515 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
517 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
518 |> restrict_visibility(%{visibility: "direct"})
524 defp fetch_paginated_optimized(query, opts, pagination) do
525 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
526 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
527 opts = Map.put(opts, :skip_extra_order, true)
529 Pagination.fetch_paginated(query, opts, pagination)
532 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
533 list_memberships = Pleroma.List.memberships(opts[:user])
535 fetch_activities_query(recipients ++ list_memberships, opts)
536 |> fetch_paginated_optimized(opts, pagination)
538 |> maybe_update_cc(list_memberships, opts[:user])
541 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
543 includes_local_public = Map.get(opts, :includes_local_public, false)
545 opts = Map.delete(opts, :user)
547 intended_recipients =
548 if includes_local_public do
549 [Constants.as_public(), as_local_public()]
551 [Constants.as_public()]
555 |> fetch_activities_query(opts)
556 |> restrict_unlisted(opts)
557 |> fetch_paginated_optimized(opts, pagination)
560 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
561 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
563 |> Map.put(:restrict_unlisted, true)
564 |> fetch_public_or_unlisted_activities(pagination)
567 @valid_visibilities ~w[direct unlisted public private]
569 defp restrict_visibility(query, %{visibility: visibility})
570 when is_list(visibility) do
571 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
576 "activity_visibility(?, ?, ?) = ANY (?)",
584 Logger.error("Could not restrict visibility to #{visibility}")
588 defp restrict_visibility(query, %{visibility: visibility})
589 when visibility in @valid_visibilities do
593 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
597 defp restrict_visibility(_query, %{visibility: visibility})
598 when visibility not in @valid_visibilities do
599 Logger.error("Could not restrict visibility to #{visibility}")
602 defp restrict_visibility(query, _visibility), do: query
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when is_list(visibility) do
606 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
611 "activity_visibility(?, ?, ?) = ANY (?)",
619 Logger.error("Could not exclude visibility to #{visibility}")
624 defp exclude_visibility(query, %{exclude_visibilities: visibility})
625 when visibility in @valid_visibilities do
630 "activity_visibility(?, ?, ?) = ?",
639 defp exclude_visibility(query, %{exclude_visibilities: visibility})
640 when visibility not in [nil | @valid_visibilities] do
641 Logger.error("Could not exclude visibility to #{visibility}")
645 defp exclude_visibility(query, _visibility), do: query
647 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
650 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
653 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
654 local_public = as_local_public()
658 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
662 defp restrict_thread_visibility(query, _, _), do: query
664 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
667 |> Map.put(:user, reading_user)
668 |> Map.put(:actor_id, user.ap_id)
671 godmode: params[:godmode],
672 reading_user: reading_user
674 |> user_activities_recipients()
675 |> fetch_activities(params)
679 def fetch_user_activities(user, reading_user, params \\ %{})
681 def fetch_user_activities(user, reading_user, %{total: true} = params) do
682 result = fetch_activities_for_user(user, reading_user, params)
684 Keyword.put(result, :items, Enum.reverse(result[:items]))
687 def fetch_user_activities(user, reading_user, params) do
689 |> fetch_activities_for_user(reading_user, params)
693 defp fetch_activities_for_user(user, reading_user, params) do
696 |> Map.put(:type, ["Create", "Announce"])
697 |> Map.put(:user, reading_user)
698 |> Map.put(:actor_id, user.ap_id)
699 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
702 if User.blocks?(reading_user, user) do
706 |> Map.put(:blocking_user, reading_user)
707 |> Map.put(:muting_user, reading_user)
710 pagination_type = Map.get(params, :pagination_type) || :keyset
713 godmode: params[:godmode],
714 reading_user: reading_user
716 |> user_activities_recipients()
717 |> fetch_activities(params, pagination_type)
720 def fetch_statuses(reading_user, %{total: true} = params) do
721 result = fetch_activities_for_reading_user(reading_user, params)
722 Keyword.put(result, :items, Enum.reverse(result[:items]))
725 def fetch_statuses(reading_user, params) do
727 |> fetch_activities_for_reading_user(params)
731 defp fetch_activities_for_reading_user(reading_user, params) do
732 params = Map.put(params, :type, ["Create", "Announce"])
735 godmode: params[:godmode],
736 reading_user: reading_user
738 |> user_activities_recipients()
739 |> fetch_activities(params, :offset)
742 def user_activities_recipients(%{godmode: true}), do: []
744 def user_activities_recipients(%{reading_user: reading_user}) do
745 if not is_nil(reading_user) and reading_user.local do
747 Constants.as_public(),
749 reading_user.ap_id | User.following(reading_user)
752 [Constants.as_public()]
756 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
757 raise "Can't use the child object without preloading!"
760 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
762 [activity, object] in query,
765 "?->>'type' != ? or ?->>'actor' != ?",
774 defp restrict_announce_object_actor(query, _), do: query
776 defp restrict_since(query, %{since_id: ""}), do: query
778 defp restrict_since(query, %{since_id: since_id}) do
779 from(activity in query, where: activity.id > ^since_id)
782 defp restrict_since(query, _), do: query
784 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
785 raise_on_missing_preload()
788 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
790 [_activity, object] in query,
791 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
795 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
796 restrict_embedded_tag_any(query, %{tag: tag})
799 defp restrict_embedded_tag_all(query, _), do: query
801 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
802 raise_on_missing_preload()
805 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
807 [_activity, object] in query,
808 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
812 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
813 restrict_embedded_tag_any(query, %{tag: [tag]})
816 defp restrict_embedded_tag_any(query, _), do: query
818 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
819 raise_on_missing_preload()
822 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
824 [_activity, object] in query,
825 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
829 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
830 when is_binary(tag_reject) do
831 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
834 defp restrict_embedded_tag_reject_any(query, _), do: query
836 defp object_ids_query_for_tags(tags) do
837 from(hto in "hashtags_objects")
838 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
839 |> where([hto, ht], ht.name in ^tags)
840 |> select([hto], hto.object_id)
841 |> distinct([hto], true)
844 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
845 raise_on_missing_preload()
848 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
849 restrict_hashtag_any(query, %{tag: single_tag})
852 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
854 [_activity, object] in query,
858 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
859 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
860 AND hashtags_objects.object_id = ?) @> ?
869 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
870 restrict_hashtag_all(query, %{tag_all: [tag]})
873 defp restrict_hashtag_all(query, _), do: query
875 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
876 raise_on_missing_preload()
879 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
881 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
884 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
886 [_activity, object] in query,
887 join: hto in "hashtags_objects",
888 on: hto.object_id == object.id,
889 where: hto.hashtag_id in ^hashtag_ids,
890 distinct: [desc: object.id],
891 order_by: [desc: object.id]
895 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
896 restrict_hashtag_any(query, %{tag: [tag]})
899 defp restrict_hashtag_any(query, _), do: query
901 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
902 raise_on_missing_preload()
905 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
907 [_activity, object] in query,
908 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
912 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
913 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
916 defp restrict_hashtag_reject_any(query, _), do: query
918 defp raise_on_missing_preload do
919 raise "Can't use the child object without preloading!"
922 defp restrict_recipients(query, [], _user), do: query
924 defp restrict_recipients(query, recipients, nil) do
925 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
928 defp restrict_recipients(query, recipients, user) do
931 where: fragment("? && ?", ^recipients, activity.recipients),
932 or_where: activity.actor == ^user.ap_id
936 # Essentially, either look for activities addressed to `recipients`, _OR_ ones
937 # that reference a hashtag that the user follows
938 # Firstly, two fallbacks in case there's no hashtag constraint, or the user doesn't
940 defp restrict_recipients_or_hashtags(query, recipients, user, nil) do
941 restrict_recipients(query, recipients, user)
944 defp restrict_recipients_or_hashtags(query, recipients, user, []) do
945 restrict_recipients(query, recipients, user)
948 defp restrict_recipients_or_hashtags(query, recipients, _user, hashtag_ids) do
949 from([activity, object] in query)
950 |> join(:left, [activity, object], hto in "hashtags_objects",
951 on: hto.object_id == object.id,
955 [activity, object, hto: hto],
956 (hto.hashtag_id in ^hashtag_ids and ^Constants.as_public() in activity.recipients) or
957 fragment("? && ?", ^recipients, activity.recipients)
961 defp restrict_local(query, %{local_only: true}) do
962 from(activity in query, where: activity.local == true)
965 defp restrict_local(query, _), do: query
967 defp restrict_remote(query, %{remote: true}) do
968 from(activity in query, where: activity.local == false)
971 defp restrict_remote(query, _), do: query
973 defp restrict_actor(query, %{actor_id: actor_id}) do
974 from(activity in query, where: activity.actor == ^actor_id)
977 defp restrict_actor(query, _), do: query
979 defp restrict_type(query, %{type: type}) when is_binary(type) do
980 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
983 defp restrict_type(query, %{type: type}) do
984 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
987 defp restrict_type(query, _), do: query
989 defp restrict_state(query, %{state: state}) do
990 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
993 defp restrict_state(query, _), do: query
995 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
997 [_activity, object] in query,
998 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
1002 defp restrict_favorited_by(query, _), do: query
1004 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
1005 raise "Can't use the child object without preloading!"
1008 defp restrict_media(query, %{only_media: true}) do
1010 [activity, object] in query,
1011 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
1012 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
1016 defp restrict_media(query, _), do: query
1018 defp restrict_replies(query, %{exclude_replies: true}) do
1020 [_activity, object] in query,
1021 where: fragment("?->>'inReplyTo' is null", object.data)
1025 defp restrict_replies(query, %{
1026 reply_filtering_user: %User{} = user,
1027 reply_visibility: "self"
1030 [activity, object] in query,
1033 "?->>'inReplyTo' is null OR ? = ANY(?)",
1041 defp restrict_replies(query, %{
1042 reply_filtering_user: %User{} = user,
1043 reply_visibility: "following"
1046 [activity, object] in query,
1050 ?->>'type' != 'Create' -- This isn't a Create
1051 OR ?->>'inReplyTo' is null -- this isn't a reply
1052 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1053 -- unless they are the author (because authors
1054 -- are also part of the recipients). This leads
1055 -- to a bug that self-replies by friends won't
1057 OR ? = ? -- The actor is us
1061 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1062 activity.recipients,
1070 defp restrict_replies(query, _), do: query
1072 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1073 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1076 defp restrict_reblogs(query, _), do: query
1078 defp restrict_muted(query, %{with_muted: true}), do: query
1080 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1081 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1084 from([activity] in query,
1085 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1088 "not (?->'to' \\?| ?) or ? = ?",
1096 unless opts[:skip_preload] do
1097 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1103 defp restrict_muted(query, _), do: query
1105 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1106 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1107 domain_blocks = user.domain_blocks || []
1109 following_ap_ids = User.get_friends_ap_ids(user)
1112 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1115 [activity, object: o] in query,
1116 # You don't block the author
1117 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1119 # You don't block any recipients, and didn't author the post
1122 "((not (? && ?)) or ? = ?)",
1123 activity.recipients,
1129 # You don't block the domain of any recipients, and didn't author the post
1132 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1133 activity.recipients,
1139 # It's not a boost of a user you block
1142 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1148 # You don't block the author's domain, and also don't follow the author
1151 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1158 # Same as above, but checks the Object
1161 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1170 defp restrict_blocked(query, _), do: query
1172 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1173 if Config.get([:activitypub, :blockers_visible]) == true do
1176 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1180 # The author doesn't block you
1181 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1183 # It's not a boost of a user that blocks you
1186 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1195 defp restrict_blockers_visibility(query, _), do: query
1197 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1202 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1204 ^[Constants.as_public()]
1209 defp restrict_unlisted(query, _), do: query
1211 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1213 [activity, object: o] in query,
1216 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1225 defp restrict_pinned(query, _), do: query
1227 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1228 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1234 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1242 defp restrict_muted_reblogs(query, _), do: query
1244 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1247 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1251 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1254 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1258 defp restrict_instance(query, _), do: query
1260 defp restrict_filtered(query, %{user: %User{} = user}) do
1261 case Filter.compose_regex(user) do
1266 from([activity, object] in query,
1268 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1269 activity.actor == ^user.ap_id
1274 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1275 restrict_filtered(query, %{user: user})
1278 defp restrict_filtered(query, _), do: query
1280 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1282 defp exclude_poll_votes(query, _) do
1283 if has_named_binding?(query, :object) do
1284 from([activity, object: o] in query,
1285 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1292 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1293 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1295 defp exclude_invisible_actors(query, _opts) do
1297 |> join(:inner, [activity], u in User,
1299 on: activity.actor == u.ap_id and u.invisible == false
1303 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1304 from(activity in query, where: activity.id != ^id)
1307 defp exclude_id(query, _), do: query
1309 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1311 defp maybe_preload_objects(query, _) do
1313 |> Activity.with_preloaded_object()
1316 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1318 defp maybe_preload_bookmarks(query, opts) do
1320 |> Activity.with_preloaded_bookmark(opts[:user])
1323 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1325 |> Activity.with_preloaded_report_notes()
1328 defp maybe_preload_report_notes(query, _), do: query
1330 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1332 defp maybe_set_thread_muted_field(query, opts) do
1334 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1337 defp maybe_order(query, %{order: :desc}) do
1339 |> order_by(desc: :id)
1342 defp maybe_order(query, %{order: :asc}) do
1344 |> order_by(asc: :id)
1347 defp maybe_order(query, _), do: query
1349 defp normalize_fetch_activities_query_opts(opts) do
1350 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1352 value when is_bitstring(value) ->
1353 Map.put(opts, key, Hashtag.normalize_name(value))
1355 value when is_list(value) ->
1358 |> Enum.map(&Hashtag.normalize_name/1)
1361 Map.put(opts, key, normalized_value)
1369 defp fetch_activities_query_ap_ids_ops(opts) do
1370 source_user = opts[:muting_user]
1371 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1373 ap_id_relationships =
1374 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1375 [:block | ap_id_relationships]
1380 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1382 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1383 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1385 restrict_muted_reblogs_opts =
1386 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1388 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1391 def fetch_activities_query(recipients, opts \\ %{}) do
1392 opts = normalize_fetch_activities_query_opts(opts)
1394 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1395 fetch_activities_query_ap_ids_ops(opts)
1398 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1403 |> maybe_preload_objects(opts)
1404 |> maybe_preload_bookmarks(opts)
1405 |> maybe_preload_report_notes(opts)
1406 |> maybe_set_thread_muted_field(opts)
1407 |> maybe_order(opts)
1408 |> restrict_recipients_or_hashtags(recipients, opts[:user], opts[:followed_hashtags])
1409 |> restrict_replies(opts)
1410 |> restrict_since(opts)
1411 |> restrict_local(opts)
1412 |> restrict_remote(opts)
1413 |> restrict_actor(opts)
1414 |> restrict_type(opts)
1415 |> restrict_state(opts)
1416 |> restrict_favorited_by(opts)
1417 |> restrict_blocked(restrict_blocked_opts)
1418 |> restrict_blockers_visibility(opts)
1419 |> restrict_muted(restrict_muted_opts)
1420 |> restrict_filtered(opts)
1421 |> restrict_media(opts)
1422 |> restrict_visibility(opts)
1423 |> restrict_thread_visibility(opts, config)
1424 |> restrict_reblogs(opts)
1425 |> restrict_pinned(opts)
1426 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1427 |> restrict_instance(opts)
1428 |> restrict_announce_object_actor(opts)
1429 |> restrict_filtered(opts)
1430 |> maybe_restrict_deactivated_users(opts)
1431 |> exclude_poll_votes(opts)
1432 |> exclude_invisible_actors(opts)
1433 |> exclude_visibility(opts)
1435 if Config.feature_enabled?(:improved_hashtag_timeline) do
1437 |> restrict_hashtag_any(opts)
1438 |> restrict_hashtag_all(opts)
1439 |> restrict_hashtag_reject_any(opts)
1442 |> restrict_embedded_tag_any(opts)
1443 |> restrict_embedded_tag_all(opts)
1444 |> restrict_embedded_tag_reject_any(opts)
1449 Fetch favorites activities of user with order by sort adds to favorites
1451 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1452 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1454 |> Activity.Queries.by_actor()
1455 |> Activity.Queries.by_type("Like")
1456 |> Activity.with_joined_object()
1457 |> Object.with_joined_activity()
1458 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1459 |> order_by([like, _, _], desc_nulls_last: like.id)
1460 |> Pagination.fetch_paginated(
1461 Map.merge(params, %{skip_order: true}),
1466 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1467 Enum.map(activities, fn
1468 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1469 if Enum.any?(bcc, &(&1 in list_memberships)) do
1470 update_in(activity.data["cc"], &[user_ap_id | &1])
1480 defp maybe_update_cc(activities, _, _), do: activities
1482 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1483 from(activity in query,
1485 fragment("? && ?", activity.recipients, ^recipients) or
1486 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1487 ^Constants.as_public() in activity.recipients)
1491 def fetch_activities_bounded(
1493 recipients_with_public,
1495 pagination \\ :keyset
1497 fetch_activities_query([], opts)
1498 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1499 |> Pagination.fetch_paginated(opts, pagination)
1503 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1504 def upload(file, opts \\ []) do
1505 with {:ok, data} <- Upload.store(sanitize_upload_file(file), opts) do
1506 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1508 Repo.insert(%Object{data: obj_data})
1512 defp sanitize_upload_file(%Plug.Upload{filename: filename} = upload) when is_binary(filename) do
1515 | filename: Path.basename(filename)
1519 defp sanitize_upload_file(upload), do: upload
1521 @spec get_actor_url(any()) :: binary() | nil
1522 defp get_actor_url(url) when is_binary(url), do: url
1523 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1525 defp get_actor_url(url) when is_list(url) do
1531 defp get_actor_url(_url), do: nil
1533 defp normalize_image(%{"url" => url}) do
1536 "url" => [%{"href" => url}]
1540 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1541 defp normalize_image(_), do: nil
1543 defp normalize_also_known_as(aka) when is_list(aka), do: aka
1544 defp normalize_also_known_as(aka) when is_binary(aka), do: [aka]
1545 defp normalize_also_known_as(nil), do: []
1547 defp object_to_user_data(data, additional) do
1550 |> Map.get("attachment", [])
1551 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1552 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1556 |> Map.get("tag", [])
1558 %{"type" => "Emoji"} -> true
1561 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1562 {String.trim(name, ":"), url}
1565 is_locked = data["manuallyApprovesFollowers"] || false
1566 data = Transmogrifier.maybe_fix_user_object(data)
1567 is_discoverable = data["discoverable"] || false
1568 invisible = data["invisible"] || false
1569 actor_type = data["type"] || "Person"
1571 featured_address = data["featured"]
1572 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1575 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1576 data["publicKey"]["publicKeyPem"]
1580 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1581 data["endpoints"]["sharedInbox"]
1584 # if WebFinger request was already done, we probably have acct, otherwise
1585 # we request WebFinger here
1586 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1588 # also_known_as must be a URL
1591 |> Map.get("alsoKnownAs", [])
1592 |> normalize_also_known_as()
1593 |> Enum.filter(fn url ->
1594 case URI.parse(url) do
1595 %URI{scheme: "http"} -> true
1596 %URI{scheme: "https"} -> true
1603 uri: get_actor_url(data["url"]),
1605 banner: normalize_image(data["image"]),
1608 is_locked: is_locked,
1609 is_discoverable: is_discoverable,
1610 invisible: invisible,
1611 avatar: normalize_image(data["icon"]),
1613 follower_address: data["followers"],
1614 following_address: data["following"],
1615 featured_address: featured_address,
1616 bio: data["summary"] || "",
1617 actor_type: actor_type,
1618 also_known_as: also_known_as,
1619 public_key: public_key,
1620 inbox: data["inbox"],
1621 shared_inbox: shared_inbox,
1622 pinned_objects: pinned_objects,
1627 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1628 generated = "#{username}@#{URI.parse(data["id"]).host}"
1630 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1631 case WebFinger.finger(generated) do
1632 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1640 # nickname can be nil because of virtual actors
1641 defp generate_nickname(_), do: nil
1643 def fetch_follow_information_for_user(user) do
1644 with {:ok, following_data} <-
1645 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1646 {:ok, hide_follows} <- collection_private(following_data),
1647 {:ok, followers_data} <-
1648 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1649 {:ok, hide_followers} <- collection_private(followers_data) do
1652 hide_follows: hide_follows,
1653 follower_count: normalize_counter(followers_data["totalItems"]),
1654 following_count: normalize_counter(following_data["totalItems"]),
1655 hide_followers: hide_followers
1658 {:error, _} = e -> e
1663 defp normalize_counter(counter) when is_integer(counter), do: counter
1664 defp normalize_counter(_), do: 0
1666 def maybe_update_follow_information(user_data) do
1667 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1668 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1670 {:collections_available,
1671 !!(user_data[:following_address] && user_data[:follower_address])},
1673 fetch_follow_information_for_user(user_data) do
1674 info = Map.merge(user_data[:info] || %{}, info)
1677 |> Map.put(:info, info)
1679 {:user_type_check, false} ->
1682 {:collections_available, false} ->
1685 {:enabled, false} ->
1690 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1697 defp collection_private(%{"first" => %{"type" => type}})
1698 when type in ["CollectionPage", "OrderedCollectionPage"],
1701 defp collection_private(%{"first" => first}) do
1702 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1703 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1706 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1707 {:error, _} = e -> e
1712 defp collection_private(_data), do: {:ok, true}
1714 def user_data_from_user_object(data, additional \\ []) do
1715 with {:ok, data} <- MRF.filter(data) do
1716 {:ok, object_to_user_data(data, additional)}
1722 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1723 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1724 {:ok, data} <- user_data_from_user_object(data, additional) do
1725 {:ok, maybe_update_follow_information(data)}
1727 # If this has been deleted, only log a debug and not an error
1728 {:error, {"Object has been deleted", _, _} = e} ->
1729 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1732 {:error, {:reject, reason} = e} ->
1733 Logger.debug("Rejected user #{ap_id}: #{inspect(reason)}")
1737 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1742 def maybe_handle_clashing_nickname(data) do
1743 with nickname when is_binary(nickname) <- data[:nickname],
1744 %User{} = old_user <- User.get_by_nickname(nickname),
1745 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1747 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1751 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1752 |> User.update_and_set_cache()
1754 {:ap_id_comparison, true} ->
1756 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1764 def pin_data_from_featured_collection(%{
1765 "type" => "OrderedCollection",
1768 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1770 |> Map.get("orderedItems")
1771 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1774 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1779 def pin_data_from_featured_collection(
1784 when type in ["OrderedCollection", "Collection"] do
1785 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1787 # Items can either be a map _or_ a string
1790 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1791 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1795 def fetch_and_prepare_featured_from_ap_id(nil) do
1799 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1800 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1801 {:ok, pin_data_from_featured_collection(data)}
1804 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1809 def pinned_fetch_task(nil), do: nil
1811 def pinned_fetch_task(%{pinned_objects: pins}) do
1812 if Enum.all?(pins, fn {ap_id, _} ->
1813 Object.get_cached_by_ap_id(ap_id) ||
1814 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1822 def make_user_from_ap_id(ap_id, additional \\ []) do
1823 user = User.get_cached_by_ap_id(ap_id)
1825 if user && !User.ap_enabled?(user) do
1826 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1828 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1829 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1833 |> User.remote_user_changeset(data)
1834 |> User.update_and_set_cache()
1836 maybe_handle_clashing_nickname(data)
1839 |> User.remote_user_changeset()
1847 def make_user_from_nickname(nickname) do
1848 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1849 WebFinger.finger(nickname) do
1850 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1852 _e -> {:error, "No AP id in WebFinger"}
1856 # filter out broken threads
1857 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1858 entire_thread_visible_for_user?(activity, user)
1861 # do post-processing on a specific activity
1862 def contain_activity(%Activity{} = activity, %User{} = user) do
1863 contain_broken_threads(activity, user)
1866 def fetch_direct_messages_query do
1868 |> restrict_type(%{type: "Create"})
1869 |> restrict_visibility(%{visibility: "direct"})
1870 |> order_by([activity], asc: activity.id)
1873 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1875 defp maybe_restrict_deactivated_users(activity, _opts),
1876 do: Activity.restrict_deactivated_users(activity)