Extract deactivated users query to a join
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 original_activity =
198 case activity do
199 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
200 Activity.get_create_by_object_ap_id_with_object(id)
201
202 _ ->
203 activity
204 end
205
206 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
207 participations = get_participations(conversation)
208 stream_out(activity)
209 stream_out_participations(participations)
210 end
211
212 defp maybe_create_activity_expiration(
213 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
214 ) do
215 with {:ok, _job} <-
216 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
217 activity_id: activity.id,
218 expires_at: expires_at
219 }) do
220 {:ok, activity}
221 end
222 end
223
224 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
225
226 defp create_or_bump_conversation(activity, actor) do
227 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
228 %User{} = user <- User.get_cached_by_ap_id(actor) do
229 Participation.mark_as_read(user, conversation)
230 {:ok, conversation}
231 end
232 end
233
234 defp get_participations({:ok, conversation}) do
235 conversation
236 |> Repo.preload(:participations, force: true)
237 |> Map.get(:participations)
238 end
239
240 defp get_participations(_), do: []
241
242 def stream_out_participations(participations) do
243 participations =
244 participations
245 |> Repo.preload(:user)
246
247 Streamer.stream("participation", participations)
248 end
249
250 @impl true
251 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
252 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
253 conversation = Repo.preload(conversation, :participations)
254
255 last_activity_id =
256 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
257 user: user,
258 blocking_user: user
259 })
260
261 if last_activity_id do
262 stream_out_participations(conversation.participations)
263 end
264 end
265 end
266
267 @impl true
268 def stream_out_participations(_, _), do: :noop
269
270 @impl true
271 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
272 when data_type in ["Create", "Announce", "Delete", "Update"] do
273 activity
274 |> Topics.get_activity_topics()
275 |> Streamer.stream(activity)
276 end
277
278 @impl true
279 def stream_out(_activity) do
280 :noop
281 end
282
283 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
284 def create(params, fake \\ false) do
285 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
286 result
287 end
288 end
289
290 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
291 additional = params[:additional] || %{}
292 # only accept false as false value
293 local = !(params[:local] == false)
294 published = params[:published]
295 quick_insert? = Config.get([:env]) == :benchmark
296
297 create_data =
298 make_create_data(
299 %{to: to, actor: actor, published: published, context: context, object: object},
300 additional
301 )
302
303 with {:ok, activity} <- insert(create_data, local, fake),
304 {:fake, false, activity} <- {:fake, fake, activity},
305 _ <- increase_replies_count_if_reply(create_data),
306 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
307 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
308 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
309 _ <- notify_and_stream(activity),
310 :ok <- maybe_schedule_poll_notifications(activity),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 else
314 {:quick_insert, true, activity} ->
315 {:ok, activity}
316
317 {:fake, true, activity} ->
318 {:ok, activity}
319
320 {:error, message} ->
321 Repo.rollback(message)
322 end
323 end
324
325 defp maybe_schedule_poll_notifications(activity) do
326 PollWorker.schedule_poll_end(activity)
327 :ok
328 end
329
330 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
331 {:ok, Activity.t()} | nil | {:error, any()}
332 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
333 with {:ok, result} <-
334 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
335 result
336 end
337 end
338
339 defp do_unfollow(follower, followed, activity_id, local)
340
341 defp do_unfollow(follower, followed, activity_id, local) when local == true do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
344 {:ok, activity} <- insert(unfollow_data, local),
345 {:ok, _activity} <- Repo.delete(follow_activity),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
348 {:ok, activity}
349 else
350 nil -> nil
351 {:error, error} -> Repo.rollback(error)
352 end
353 end
354
355 defp do_unfollow(follower, followed, activity_id, false) do
356 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
357 # uses deterministic ids for follows.
358 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
359 {:ok, _activity} <- Repo.delete(follow_activity),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
362 _ <- notify_and_stream(unfollow_activity) do
363 {:ok, unfollow_activity}
364 else
365 nil -> nil
366 {:error, error} -> Repo.rollback(error)
367 end
368 end
369
370 defp make_unfollow_activity(data, local) do
371 {recipients, _, _} = get_recipients(data)
372
373 %Activity{
374 data: data,
375 local: local,
376 actor: data["actor"],
377 recipients: recipients
378 }
379 end
380
381 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
382 def flag(params) do
383 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
384 result
385 end
386 end
387
388 defp do_flag(
389 %{
390 actor: actor,
391 context: _context,
392 account: account,
393 statuses: statuses,
394 content: content
395 } = params
396 ) do
397 # only accept false as false value
398 local = !(params[:local] == false)
399 forward = !(params[:forward] == false)
400
401 additional = params[:additional] || %{}
402
403 additional =
404 if forward do
405 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
406 else
407 Map.merge(additional, %{"to" => [], "cc" => []})
408 end
409
410 with flag_data <- make_flag_data(params, additional),
411 {:ok, activity} <- insert(flag_data, local),
412 {:ok, stripped_activity} <- strip_report_status_data(activity),
413 _ <- notify_and_stream(activity),
414 :ok <-
415 maybe_federate(stripped_activity) do
416 User.all_superusers()
417 |> Enum.filter(fn user -> user.ap_id != actor end)
418 |> Enum.filter(fn user -> not is_nil(user.email) end)
419 |> Enum.each(fn superuser ->
420 superuser
421 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
422 |> Pleroma.Emails.Mailer.deliver_async()
423 end)
424
425 {:ok, activity}
426 else
427 {:error, error} -> Repo.rollback(error)
428 end
429 end
430
431 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
432 def move(%User{} = origin, %User{} = target, local \\ true) do
433 params = %{
434 "type" => "Move",
435 "actor" => origin.ap_id,
436 "object" => origin.ap_id,
437 "target" => target.ap_id,
438 "to" => [origin.follower_address]
439 }
440
441 with true <- origin.ap_id in target.also_known_as,
442 {:ok, activity} <- insert(params, local),
443 _ <- notify_and_stream(activity) do
444 maybe_federate(activity)
445
446 BackgroundWorker.enqueue("move_following", %{
447 "origin_id" => origin.id,
448 "target_id" => target.id
449 })
450
451 {:ok, activity}
452 else
453 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
454 err -> err
455 end
456 end
457
458 def fetch_activities_for_context_query(context, opts) do
459 public = [Constants.as_public()]
460
461 recipients =
462 if opts[:user],
463 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
464 else: public
465
466 from(activity in Activity)
467 |> maybe_preload_objects(opts)
468 |> maybe_preload_bookmarks(opts)
469 |> maybe_set_thread_muted_field(opts)
470 |> restrict_blocked(opts)
471 |> restrict_blockers_visibility(opts)
472 |> restrict_recipients(recipients, opts[:user])
473 |> restrict_filtered(opts)
474 |> where(
475 [activity],
476 fragment(
477 "?->>'type' = ? and ?->>'context' = ?",
478 activity.data,
479 "Create",
480 activity.data,
481 ^context
482 )
483 )
484 |> exclude_poll_votes(opts)
485 |> exclude_id(opts)
486 |> order_by([activity], desc: activity.id)
487 end
488
489 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
490 def fetch_activities_for_context(context, opts \\ %{}) do
491 context
492 |> fetch_activities_for_context_query(opts)
493 |> Repo.all()
494 end
495
496 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
497 FlakeId.Ecto.CompatType.t() | nil
498 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
499 context
500 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
501 |> restrict_visibility(%{visibility: "direct"})
502 |> limit(1)
503 |> select([a], a.id)
504 |> Repo.one()
505 end
506
507 defp fetch_paginated_optimized(query, opts, pagination) do
508 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
509 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
510 IO.inspect(Repo.to_sql(:all, query))
511 opts = Map.put(opts, :skip_extra_order, true)
512
513 Pagination.fetch_paginated(query, opts, pagination)
514 end
515
516 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
517 list_memberships = Pleroma.List.memberships(opts[:user])
518
519 fetch_activities_query(recipients ++ list_memberships, opts)
520 |> fetch_paginated_optimized(opts, pagination)
521 |> Enum.reverse()
522 |> maybe_update_cc(list_memberships, opts[:user])
523 end
524
525 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
526 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
527 includes_local_public = Map.get(opts, :includes_local_public, false)
528
529 opts = Map.delete(opts, :user)
530
531 intended_recipients =
532 if includes_local_public do
533 [Constants.as_public(), as_local_public()]
534 else
535 [Constants.as_public()]
536 end
537
538 intended_recipients
539 |> fetch_activities_query(opts)
540 |> restrict_unlisted(opts)
541 |> fetch_paginated_optimized(opts, pagination)
542 end
543
544 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
545 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
546 opts
547 |> Map.put(:restrict_unlisted, true)
548 |> fetch_public_or_unlisted_activities(pagination)
549 end
550
551 @valid_visibilities ~w[direct unlisted public private]
552
553 defp restrict_visibility(query, %{visibility: visibility})
554 when is_list(visibility) do
555 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
556 from(
557 a in query,
558 where:
559 fragment(
560 "activity_visibility(?, ?, ?) = ANY (?)",
561 a.actor,
562 a.recipients,
563 a.data,
564 ^visibility
565 )
566 )
567 else
568 Logger.error("Could not restrict visibility to #{visibility}")
569 end
570 end
571
572 defp restrict_visibility(query, %{visibility: visibility})
573 when visibility in @valid_visibilities do
574 from(
575 a in query,
576 where:
577 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
578 )
579 end
580
581 defp restrict_visibility(_query, %{visibility: visibility})
582 when visibility not in @valid_visibilities do
583 Logger.error("Could not restrict visibility to #{visibility}")
584 end
585
586 defp restrict_visibility(query, _visibility), do: query
587
588 defp exclude_visibility(query, %{exclude_visibilities: visibility})
589 when is_list(visibility) do
590 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
591 from(
592 a in query,
593 where:
594 not fragment(
595 "activity_visibility(?, ?, ?) = ANY (?)",
596 a.actor,
597 a.recipients,
598 a.data,
599 ^visibility
600 )
601 )
602 else
603 Logger.error("Could not exclude visibility to #{visibility}")
604 query
605 end
606 end
607
608 defp exclude_visibility(query, %{exclude_visibilities: visibility})
609 when visibility in @valid_visibilities do
610 from(
611 a in query,
612 where:
613 not fragment(
614 "activity_visibility(?, ?, ?) = ?",
615 a.actor,
616 a.recipients,
617 a.data,
618 ^visibility
619 )
620 )
621 end
622
623 defp exclude_visibility(query, %{exclude_visibilities: visibility})
624 when visibility not in [nil | @valid_visibilities] do
625 Logger.error("Could not exclude visibility to #{visibility}")
626 query
627 end
628
629 defp exclude_visibility(query, _visibility), do: query
630
631 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
632 do: query
633
634 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
635 do: query
636
637 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
638 local_public = as_local_public()
639
640 from(
641 a in query,
642 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
643 )
644 end
645
646 defp restrict_thread_visibility(query, _, _), do: query
647
648 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
649 params =
650 params
651 |> Map.put(:user, reading_user)
652 |> Map.put(:actor_id, user.ap_id)
653
654 %{
655 godmode: params[:godmode],
656 reading_user: reading_user
657 }
658 |> user_activities_recipients()
659 |> fetch_activities(params)
660 |> Enum.reverse()
661 end
662
663 def fetch_user_activities(user, reading_user, params \\ %{})
664
665 def fetch_user_activities(user, reading_user, %{total: true} = params) do
666 result = fetch_activities_for_user(user, reading_user, params)
667
668 Keyword.put(result, :items, Enum.reverse(result[:items]))
669 end
670
671 def fetch_user_activities(user, reading_user, params) do
672 user
673 |> fetch_activities_for_user(reading_user, params)
674 |> Enum.reverse()
675 end
676
677 defp fetch_activities_for_user(user, reading_user, params) do
678 params =
679 params
680 |> Map.put(:type, ["Create", "Announce"])
681 |> Map.put(:user, reading_user)
682 |> Map.put(:actor_id, user.ap_id)
683 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
684
685 params =
686 if User.blocks?(reading_user, user) do
687 params
688 else
689 params
690 |> Map.put(:blocking_user, reading_user)
691 |> Map.put(:muting_user, reading_user)
692 end
693
694 pagination_type = Map.get(params, :pagination_type) || :keyset
695
696 %{
697 godmode: params[:godmode],
698 reading_user: reading_user
699 }
700 |> user_activities_recipients()
701 |> fetch_activities(params, pagination_type)
702 end
703
704 def fetch_statuses(reading_user, %{total: true} = params) do
705 result = fetch_activities_for_reading_user(reading_user, params)
706 Keyword.put(result, :items, Enum.reverse(result[:items]))
707 end
708
709 def fetch_statuses(reading_user, params) do
710 reading_user
711 |> fetch_activities_for_reading_user(params)
712 |> Enum.reverse()
713 end
714
715 defp fetch_activities_for_reading_user(reading_user, params) do
716 params = Map.put(params, :type, ["Create", "Announce"])
717
718 %{
719 godmode: params[:godmode],
720 reading_user: reading_user
721 }
722 |> user_activities_recipients()
723 |> fetch_activities(params, :offset)
724 end
725
726 defp user_activities_recipients(%{godmode: true}), do: []
727
728 defp user_activities_recipients(%{reading_user: reading_user}) do
729 if not is_nil(reading_user) and reading_user.local do
730 [
731 Constants.as_public(),
732 as_local_public(),
733 reading_user.ap_id | User.following(reading_user)
734 ]
735 else
736 [Constants.as_public()]
737 end
738 end
739
740 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
741 raise "Can't use the child object without preloading!"
742 end
743
744 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
745 from(
746 [activity, object] in query,
747 where:
748 fragment(
749 "?->>'type' != ? or ?->>'actor' != ?",
750 activity.data,
751 "Announce",
752 object.data,
753 ^actor
754 )
755 )
756 end
757
758 defp restrict_announce_object_actor(query, _), do: query
759
760 defp restrict_since(query, %{since_id: ""}), do: query
761
762 defp restrict_since(query, %{since_id: since_id}) do
763 from(activity in query, where: activity.id > ^since_id)
764 end
765
766 defp restrict_since(query, _), do: query
767
768 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
769 raise_on_missing_preload()
770 end
771
772 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
773 from(
774 [_activity, object] in query,
775 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
776 )
777 end
778
779 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
780 restrict_embedded_tag_any(query, %{tag: tag})
781 end
782
783 defp restrict_embedded_tag_all(query, _), do: query
784
785 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
786 raise_on_missing_preload()
787 end
788
789 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
790 from(
791 [_activity, object] in query,
792 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
793 )
794 end
795
796 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
797 restrict_embedded_tag_any(query, %{tag: [tag]})
798 end
799
800 defp restrict_embedded_tag_any(query, _), do: query
801
802 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
803 raise_on_missing_preload()
804 end
805
806 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
807 from(
808 [_activity, object] in query,
809 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
810 )
811 end
812
813 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
814 when is_binary(tag_reject) do
815 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
816 end
817
818 defp restrict_embedded_tag_reject_any(query, _), do: query
819
820 defp object_ids_query_for_tags(tags) do
821 from(hto in "hashtags_objects")
822 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
823 |> where([hto, ht], ht.name in ^tags)
824 |> select([hto], hto.object_id)
825 |> distinct([hto], true)
826 end
827
828 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
829 raise_on_missing_preload()
830 end
831
832 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
833 restrict_hashtag_any(query, %{tag: single_tag})
834 end
835
836 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
837 from(
838 [_activity, object] in query,
839 where:
840 fragment(
841 """
842 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
843 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
844 AND hashtags_objects.object_id = ?) @> ?
845 """,
846 ^tags,
847 object.id,
848 ^tags
849 )
850 )
851 end
852
853 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
854 restrict_hashtag_all(query, %{tag_all: [tag]})
855 end
856
857 defp restrict_hashtag_all(query, _), do: query
858
859 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
860 raise_on_missing_preload()
861 end
862
863 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
864 hashtag_ids =
865 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
866 |> Repo.all()
867
868 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
869 from(
870 [_activity, object] in query,
871 join: hto in "hashtags_objects",
872 on: hto.object_id == object.id,
873 where: hto.hashtag_id in ^hashtag_ids,
874 distinct: [desc: object.id],
875 order_by: [desc: object.id]
876 )
877 end
878
879 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
880 restrict_hashtag_any(query, %{tag: [tag]})
881 end
882
883 defp restrict_hashtag_any(query, _), do: query
884
885 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
886 raise_on_missing_preload()
887 end
888
889 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
890 from(
891 [_activity, object] in query,
892 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
893 )
894 end
895
896 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
897 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
898 end
899
900 defp restrict_hashtag_reject_any(query, _), do: query
901
902 defp raise_on_missing_preload do
903 raise "Can't use the child object without preloading!"
904 end
905
906 defp restrict_recipients(query, [], _user), do: query
907
908 defp restrict_recipients(query, recipients, nil) do
909 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
910 end
911
912 defp restrict_recipients(query, recipients, user) do
913 from(
914 activity in query,
915 where: fragment("? && ?", ^recipients, activity.recipients),
916 or_where: activity.actor == ^user.ap_id
917 )
918 end
919
920 defp restrict_local(query, %{local_only: true}) do
921 from(activity in query, where: activity.local == true)
922 end
923
924 defp restrict_local(query, _), do: query
925
926 defp restrict_remote(query, %{remote: true}) do
927 from(activity in query, where: activity.local == false)
928 end
929
930 defp restrict_remote(query, _), do: query
931
932 defp restrict_actor(query, %{actor_id: actor_id}) do
933 from(activity in query, where: activity.actor == ^actor_id)
934 end
935
936 defp restrict_actor(query, _), do: query
937
938 defp restrict_type(query, %{type: type}) when is_binary(type) do
939 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
940 end
941
942 defp restrict_type(query, %{type: type}) do
943 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
944 end
945
946 defp restrict_type(query, _), do: query
947
948 defp restrict_state(query, %{state: state}) do
949 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
950 end
951
952 defp restrict_state(query, _), do: query
953
954 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
955 from(
956 [_activity, object] in query,
957 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
958 )
959 end
960
961 defp restrict_favorited_by(query, _), do: query
962
963 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
964 raise "Can't use the child object without preloading!"
965 end
966
967 defp restrict_media(query, %{only_media: true}) do
968 from(
969 [activity, object] in query,
970 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
971 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
972 )
973 end
974
975 defp restrict_media(query, _), do: query
976
977 defp restrict_replies(query, %{exclude_replies: true}) do
978 from(
979 [_activity, object] in query,
980 where: fragment("?->>'inReplyTo' is null", object.data)
981 )
982 end
983
984 defp restrict_replies(query, %{
985 reply_filtering_user: %User{} = user,
986 reply_visibility: "self"
987 }) do
988 from(
989 [activity, object] in query,
990 where:
991 fragment(
992 "?->>'inReplyTo' is null OR ? = ANY(?)",
993 object.data,
994 ^user.ap_id,
995 activity.recipients
996 )
997 )
998 end
999
1000 defp restrict_replies(query, %{
1001 reply_filtering_user: %User{} = user,
1002 reply_visibility: "following"
1003 }) do
1004 from(
1005 [activity, object] in query,
1006 where:
1007 fragment(
1008 """
1009 ?->>'type' != 'Create' -- This isn't a Create
1010 OR ?->>'inReplyTo' is null -- this isn't a reply
1011 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1012 -- unless they are the author (because authors
1013 -- are also part of the recipients). This leads
1014 -- to a bug that self-replies by friends won't
1015 -- show up.
1016 OR ? = ? -- The actor is us
1017 """,
1018 activity.data,
1019 object.data,
1020 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1021 activity.recipients,
1022 activity.actor,
1023 activity.actor,
1024 ^user.ap_id
1025 )
1026 )
1027 end
1028
1029 defp restrict_replies(query, _), do: query
1030
1031 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1032 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1033 end
1034
1035 defp restrict_reblogs(query, _), do: query
1036
1037 defp restrict_muted(query, %{with_muted: true}), do: query
1038
1039 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1040 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1041
1042 query =
1043 from([activity] in query,
1044 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1045 where:
1046 fragment(
1047 "not (?->'to' \\?| ?) or ? = ?",
1048 activity.data,
1049 ^mutes,
1050 activity.actor,
1051 ^user.ap_id
1052 )
1053 )
1054
1055 unless opts[:skip_preload] do
1056 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1057 else
1058 query
1059 end
1060 end
1061
1062 defp restrict_muted(query, _), do: query
1063
1064 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1065 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1066 domain_blocks = user.domain_blocks || []
1067
1068 following_ap_ids = User.get_friends_ap_ids(user)
1069
1070 query =
1071 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1072
1073 from(
1074 [activity, object: o] in query,
1075 # You don't block the author
1076 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1077
1078 # You don't block any recipients, and didn't author the post
1079 where:
1080 fragment(
1081 "((not (? && ?)) or ? = ?)",
1082 activity.recipients,
1083 ^blocked_ap_ids,
1084 activity.actor,
1085 ^user.ap_id
1086 ),
1087
1088 # You don't block the domain of any recipients, and didn't author the post
1089 where:
1090 fragment(
1091 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1092 activity.recipients,
1093 ^domain_blocks,
1094 activity.actor,
1095 ^user.ap_id
1096 ),
1097
1098 # It's not a boost of a user you block
1099 where:
1100 fragment(
1101 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1102 activity.data,
1103 activity.data,
1104 ^blocked_ap_ids
1105 ),
1106
1107 # You don't block the author's domain, and also don't follow the author
1108 where:
1109 fragment(
1110 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1111 activity.actor,
1112 ^domain_blocks,
1113 activity.actor,
1114 ^following_ap_ids
1115 ),
1116
1117 # Same as above, but checks the Object
1118 where:
1119 fragment(
1120 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1121 o.data,
1122 ^domain_blocks,
1123 o.data,
1124 ^following_ap_ids
1125 )
1126 )
1127 end
1128
1129 defp restrict_blocked(query, _), do: query
1130
1131 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1132 if Config.get([:activitypub, :blockers_visible]) == true do
1133 query
1134 else
1135 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1136
1137 from(
1138 activity in query,
1139 # The author doesn't block you
1140 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1141
1142 # It's not a boost of a user that blocks you
1143 where:
1144 fragment(
1145 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1146 activity.data,
1147 activity.data,
1148 ^blocker_ap_ids
1149 )
1150 )
1151 end
1152 end
1153
1154 defp restrict_blockers_visibility(query, _), do: query
1155
1156 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1157 from(
1158 activity in query,
1159 where:
1160 fragment(
1161 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1162 activity.data,
1163 ^[Constants.as_public()]
1164 )
1165 )
1166 end
1167
1168 defp restrict_unlisted(query, _), do: query
1169
1170 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1171 from(
1172 [activity, object: o] in query,
1173 where:
1174 fragment(
1175 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1176 activity.data,
1177 activity.data,
1178 activity.data,
1179 ^ids
1180 )
1181 )
1182 end
1183
1184 defp restrict_pinned(query, _), do: query
1185
1186 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1187 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1188
1189 from(
1190 activity in query,
1191 where:
1192 fragment(
1193 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1194 activity.data,
1195 activity.actor,
1196 ^muted_reblogs
1197 )
1198 )
1199 end
1200
1201 defp restrict_muted_reblogs(query, _), do: query
1202
1203 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1204 from(
1205 activity in query,
1206 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1207 )
1208 end
1209
1210 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1211 from(
1212 activity in query,
1213 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1214 )
1215 end
1216
1217 defp restrict_instance(query, _), do: query
1218
1219 defp restrict_filtered(query, %{user: %User{} = user}) do
1220 case Filter.compose_regex(user) do
1221 nil ->
1222 query
1223
1224 regex ->
1225 from([activity, object] in query,
1226 where:
1227 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1228 activity.actor == ^user.ap_id
1229 )
1230 end
1231 end
1232
1233 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1234 restrict_filtered(query, %{user: user})
1235 end
1236
1237 defp restrict_filtered(query, _), do: query
1238
1239 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1240
1241 defp exclude_poll_votes(query, _) do
1242 if has_named_binding?(query, :object) do
1243 from([activity, object: o] in query,
1244 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1245 )
1246 else
1247 query
1248 end
1249 end
1250
1251 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1252
1253 defp exclude_invisible_actors(query, _opts) do
1254 invisible_ap_ids =
1255 User.Query.build(%{invisible: true, select: [:ap_id]})
1256 |> Repo.all()
1257 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1258
1259 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1260 end
1261
1262 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1263 from(activity in query, where: activity.id != ^id)
1264 end
1265
1266 defp exclude_id(query, _), do: query
1267
1268 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1269
1270 defp maybe_preload_objects(query, _) do
1271 query
1272 |> Activity.with_preloaded_object()
1273 end
1274
1275 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1276
1277 defp maybe_preload_bookmarks(query, opts) do
1278 query
1279 |> Activity.with_preloaded_bookmark(opts[:user])
1280 end
1281
1282 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1283 query
1284 |> Activity.with_preloaded_report_notes()
1285 end
1286
1287 defp maybe_preload_report_notes(query, _), do: query
1288
1289 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1290
1291 defp maybe_set_thread_muted_field(query, opts) do
1292 query
1293 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1294 end
1295
1296 defp maybe_order(query, %{order: :desc}) do
1297 query
1298 |> order_by(desc: :id)
1299 end
1300
1301 defp maybe_order(query, %{order: :asc}) do
1302 query
1303 |> order_by(asc: :id)
1304 end
1305
1306 defp maybe_order(query, _), do: query
1307
1308 defp normalize_fetch_activities_query_opts(opts) do
1309 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1310 case opts[key] do
1311 value when is_bitstring(value) ->
1312 Map.put(opts, key, Hashtag.normalize_name(value))
1313
1314 value when is_list(value) ->
1315 normalized_value =
1316 value
1317 |> Enum.map(&Hashtag.normalize_name/1)
1318 |> Enum.uniq()
1319
1320 Map.put(opts, key, normalized_value)
1321
1322 _ ->
1323 opts
1324 end
1325 end)
1326 end
1327
1328 defp fetch_activities_query_ap_ids_ops(opts) do
1329 source_user = opts[:muting_user]
1330 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1331
1332 ap_id_relationships =
1333 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1334 [:block | ap_id_relationships]
1335 else
1336 ap_id_relationships
1337 end
1338
1339 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1340
1341 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1342 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1343
1344 restrict_muted_reblogs_opts =
1345 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1346
1347 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1348 end
1349
1350 def fetch_activities_query(recipients, opts \\ %{}) do
1351 opts = normalize_fetch_activities_query_opts(opts)
1352
1353 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1354 fetch_activities_query_ap_ids_ops(opts)
1355
1356 config = %{
1357 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1358 }
1359
1360 query =
1361 Activity
1362 |> maybe_preload_objects(opts)
1363 |> maybe_preload_bookmarks(opts)
1364 |> maybe_preload_report_notes(opts)
1365 |> maybe_set_thread_muted_field(opts)
1366 |> maybe_order(opts)
1367 |> restrict_recipients(recipients, opts[:user])
1368 |> restrict_replies(opts)
1369 |> restrict_since(opts)
1370 |> restrict_local(opts)
1371 |> restrict_remote(opts)
1372 |> restrict_actor(opts)
1373 |> restrict_type(opts)
1374 |> restrict_state(opts)
1375 |> restrict_favorited_by(opts)
1376 |> restrict_blocked(restrict_blocked_opts)
1377 |> restrict_blockers_visibility(opts)
1378 |> restrict_muted(restrict_muted_opts)
1379 |> restrict_filtered(opts)
1380 |> restrict_media(opts)
1381 |> restrict_visibility(opts)
1382 |> restrict_thread_visibility(opts, config)
1383 |> restrict_reblogs(opts)
1384 |> restrict_pinned(opts)
1385 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1386 |> restrict_instance(opts)
1387 |> restrict_announce_object_actor(opts)
1388 |> restrict_filtered(opts)
1389 |> Activity.restrict_deactivated_users()
1390 |> exclude_poll_votes(opts)
1391 |> exclude_invisible_actors(opts)
1392 |> exclude_visibility(opts)
1393
1394 if Config.feature_enabled?(:improved_hashtag_timeline) do
1395 query
1396 |> restrict_hashtag_any(opts)
1397 |> restrict_hashtag_all(opts)
1398 |> restrict_hashtag_reject_any(opts)
1399 else
1400 query
1401 |> restrict_embedded_tag_any(opts)
1402 |> restrict_embedded_tag_all(opts)
1403 |> restrict_embedded_tag_reject_any(opts)
1404 end
1405 end
1406
1407 @doc """
1408 Fetch favorites activities of user with order by sort adds to favorites
1409 """
1410 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1411 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1412 user.ap_id
1413 |> Activity.Queries.by_actor()
1414 |> Activity.Queries.by_type("Like")
1415 |> Activity.with_joined_object()
1416 |> Object.with_joined_activity()
1417 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1418 |> order_by([like, _, _], desc_nulls_last: like.id)
1419 |> Pagination.fetch_paginated(
1420 Map.merge(params, %{skip_order: true}),
1421 pagination
1422 )
1423 end
1424
1425 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1426 Enum.map(activities, fn
1427 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1428 if Enum.any?(bcc, &(&1 in list_memberships)) do
1429 update_in(activity.data["cc"], &[user_ap_id | &1])
1430 else
1431 activity
1432 end
1433
1434 activity ->
1435 activity
1436 end)
1437 end
1438
1439 defp maybe_update_cc(activities, _, _), do: activities
1440
1441 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1442 from(activity in query,
1443 where:
1444 fragment("? && ?", activity.recipients, ^recipients) or
1445 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1446 ^Constants.as_public() in activity.recipients)
1447 )
1448 end
1449
1450 def fetch_activities_bounded(
1451 recipients,
1452 recipients_with_public,
1453 opts \\ %{},
1454 pagination \\ :keyset
1455 ) do
1456 fetch_activities_query([], opts)
1457 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1458 |> Pagination.fetch_paginated(opts, pagination)
1459 |> Enum.reverse()
1460 end
1461
1462 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1463 def upload(file, opts \\ []) do
1464 with {:ok, data} <- Upload.store(file, opts) do
1465 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1466
1467 Repo.insert(%Object{data: obj_data})
1468 end
1469 end
1470
1471 @spec get_actor_url(any()) :: binary() | nil
1472 defp get_actor_url(url) when is_binary(url), do: url
1473 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1474
1475 defp get_actor_url(url) when is_list(url) do
1476 url
1477 |> List.first()
1478 |> get_actor_url()
1479 end
1480
1481 defp get_actor_url(_url), do: nil
1482
1483 defp normalize_image(%{"url" => url}) do
1484 %{
1485 "type" => "Image",
1486 "url" => [%{"href" => url}]
1487 }
1488 end
1489
1490 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1491 defp normalize_image(_), do: nil
1492
1493 defp object_to_user_data(data, additional) do
1494 fields =
1495 data
1496 |> Map.get("attachment", [])
1497 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1498 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1499
1500 emojis =
1501 data
1502 |> Map.get("tag", [])
1503 |> Enum.filter(fn
1504 %{"type" => "Emoji"} -> true
1505 _ -> false
1506 end)
1507 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1508 {String.trim(name, ":"), url}
1509 end)
1510
1511 is_locked = data["manuallyApprovesFollowers"] || false
1512 data = Transmogrifier.maybe_fix_user_object(data)
1513 is_discoverable = data["discoverable"] || false
1514 invisible = data["invisible"] || false
1515 actor_type = data["type"] || "Person"
1516
1517 featured_address = data["featured"]
1518 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1519
1520 public_key =
1521 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1522 data["publicKey"]["publicKeyPem"]
1523 end
1524
1525 shared_inbox =
1526 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1527 data["endpoints"]["sharedInbox"]
1528 end
1529
1530 # if WebFinger request was already done, we probably have acct, otherwise
1531 # we request WebFinger here
1532 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1533
1534 %{
1535 ap_id: data["id"],
1536 uri: get_actor_url(data["url"]),
1537 ap_enabled: true,
1538 banner: normalize_image(data["image"]),
1539 fields: fields,
1540 emoji: emojis,
1541 is_locked: is_locked,
1542 is_discoverable: is_discoverable,
1543 invisible: invisible,
1544 avatar: normalize_image(data["icon"]),
1545 name: data["name"],
1546 follower_address: data["followers"],
1547 following_address: data["following"],
1548 featured_address: featured_address,
1549 bio: data["summary"] || "",
1550 actor_type: actor_type,
1551 also_known_as: Map.get(data, "alsoKnownAs", []),
1552 public_key: public_key,
1553 inbox: data["inbox"],
1554 shared_inbox: shared_inbox,
1555 pinned_objects: pinned_objects,
1556 nickname: nickname
1557 }
1558 end
1559
1560 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1561 generated = "#{username}@#{URI.parse(data["id"]).host}"
1562
1563 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1564 case WebFinger.finger(generated) do
1565 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1566 _ -> generated
1567 end
1568 else
1569 generated
1570 end
1571 end
1572
1573 # nickname can be nil because of virtual actors
1574 defp generate_nickname(_), do: nil
1575
1576 def fetch_follow_information_for_user(user) do
1577 with {:ok, following_data} <-
1578 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1579 {:ok, hide_follows} <- collection_private(following_data),
1580 {:ok, followers_data} <-
1581 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1582 {:ok, hide_followers} <- collection_private(followers_data) do
1583 {:ok,
1584 %{
1585 hide_follows: hide_follows,
1586 follower_count: normalize_counter(followers_data["totalItems"]),
1587 following_count: normalize_counter(following_data["totalItems"]),
1588 hide_followers: hide_followers
1589 }}
1590 else
1591 {:error, _} = e -> e
1592 e -> {:error, e}
1593 end
1594 end
1595
1596 defp normalize_counter(counter) when is_integer(counter), do: counter
1597 defp normalize_counter(_), do: 0
1598
1599 def maybe_update_follow_information(user_data) do
1600 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1601 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1602 {_, true} <-
1603 {:collections_available,
1604 !!(user_data[:following_address] && user_data[:follower_address])},
1605 {:ok, info} <-
1606 fetch_follow_information_for_user(user_data) do
1607 info = Map.merge(user_data[:info] || %{}, info)
1608
1609 user_data
1610 |> Map.put(:info, info)
1611 else
1612 {:user_type_check, false} ->
1613 user_data
1614
1615 {:collections_available, false} ->
1616 user_data
1617
1618 {:enabled, false} ->
1619 user_data
1620
1621 e ->
1622 Logger.error(
1623 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1624 )
1625
1626 user_data
1627 end
1628 end
1629
1630 defp collection_private(%{"first" => %{"type" => type}})
1631 when type in ["CollectionPage", "OrderedCollectionPage"],
1632 do: {:ok, false}
1633
1634 defp collection_private(%{"first" => first}) do
1635 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1636 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1637 {:ok, false}
1638 else
1639 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1640 {:error, _} = e -> e
1641 e -> {:error, e}
1642 end
1643 end
1644
1645 defp collection_private(_data), do: {:ok, true}
1646
1647 def user_data_from_user_object(data, additional \\ []) do
1648 with {:ok, data} <- MRF.filter(data) do
1649 {:ok, object_to_user_data(data, additional)}
1650 else
1651 e -> {:error, e}
1652 end
1653 end
1654
1655 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1656 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1657 {:ok, data} <- user_data_from_user_object(data, additional) do
1658 {:ok, maybe_update_follow_information(data)}
1659 else
1660 # If this has been deleted, only log a debug and not an error
1661 {:error, "Object has been deleted" = e} ->
1662 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1663 {:error, e}
1664
1665 {:error, {:reject, reason} = e} ->
1666 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1667 {:error, e}
1668
1669 {:error, e} ->
1670 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1671 {:error, e}
1672 end
1673 end
1674
1675 def maybe_handle_clashing_nickname(data) do
1676 with nickname when is_binary(nickname) <- data[:nickname],
1677 %User{} = old_user <- User.get_by_nickname(nickname),
1678 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1679 Logger.info(
1680 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1681 )
1682
1683 old_user
1684 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1685 |> User.update_and_set_cache()
1686 else
1687 {:ap_id_comparison, true} ->
1688 Logger.info(
1689 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1690 )
1691
1692 _ ->
1693 nil
1694 end
1695 end
1696
1697 def pin_data_from_featured_collection(%{
1698 "type" => "OrderedCollection",
1699 "first" => first
1700 }) do
1701 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1702 page
1703 |> Map.get("orderedItems")
1704 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1705 else
1706 e ->
1707 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1708 {:ok, %{}}
1709 end
1710 end
1711
1712 def pin_data_from_featured_collection(
1713 %{
1714 "type" => type
1715 } = collection
1716 )
1717 when type in ["OrderedCollection", "Collection"] do
1718 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1719
1720 # Items can either be a map _or_ a string
1721 objects
1722 |> Map.new(fn
1723 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1724 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1725 end)
1726 end
1727
1728 def fetch_and_prepare_featured_from_ap_id(nil) do
1729 {:ok, %{}}
1730 end
1731
1732 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1733 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1734 {:ok, pin_data_from_featured_collection(data)}
1735 else
1736 e ->
1737 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1738 {:ok, %{}}
1739 end
1740 end
1741
1742 def pinned_fetch_task(nil), do: nil
1743
1744 def pinned_fetch_task(%{pinned_objects: pins}) do
1745 if Enum.all?(pins, fn {ap_id, _} ->
1746 Object.get_cached_by_ap_id(ap_id) ||
1747 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1748 end) do
1749 :ok
1750 else
1751 :error
1752 end
1753 end
1754
1755 def make_user_from_ap_id(ap_id, additional \\ []) do
1756 user = User.get_cached_by_ap_id(ap_id)
1757
1758 if user && !User.ap_enabled?(user) do
1759 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1760 else
1761 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1762 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1763
1764 if user do
1765 user
1766 |> User.remote_user_changeset(data)
1767 |> User.update_and_set_cache()
1768 else
1769 maybe_handle_clashing_nickname(data)
1770
1771 data
1772 |> User.remote_user_changeset()
1773 |> Repo.insert()
1774 |> User.set_cache()
1775 end
1776 end
1777 end
1778 end
1779
1780 def make_user_from_nickname(nickname) do
1781 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1782 WebFinger.finger(nickname) do
1783 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1784 else
1785 _e -> {:error, "No AP id in WebFinger"}
1786 end
1787 end
1788
1789 # filter out broken threads
1790 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1791 entire_thread_visible_for_user?(activity, user)
1792 end
1793
1794 # do post-processing on a specific activity
1795 def contain_activity(%Activity{} = activity, %User{} = user) do
1796 contain_broken_threads(activity, user)
1797 end
1798
1799 def fetch_direct_messages_query do
1800 Activity
1801 |> restrict_type(%{type: "Create"})
1802 |> restrict_visibility(%{visibility: "direct"})
1803 |> order_by([activity], asc: activity.id)
1804 end
1805 end