8233bcbf8f36edb05bf6be51ac5079a6241e4df4
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @unpersisted_activity_types ~w[Undo Delete Remove Accept Reject]
109 @impl true
110 def persist(%{"type" => type} = object, [local: false] = meta)
111 when type in @unpersisted_activity_types do
112 {:ok, object, meta}
113 {recipients, _, _} = get_recipients(object)
114
115 unpersisted = %Activity{
116 data: object,
117 local: false,
118 recipients: recipients,
119 actor: object["actor"]
120 }
121
122 {:ok, unpersisted, meta}
123 end
124
125 @impl true
126 def persist(object, meta) do
127 with local <- Keyword.fetch!(meta, :local),
128 {recipients, _, _} <- get_recipients(object),
129 {:ok, activity} <-
130 Repo.insert(%Activity{
131 data: object,
132 local: local,
133 recipients: recipients,
134 actor: object["actor"]
135 }),
136 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
137 {:ok, _} <- maybe_create_activity_expiration(activity) do
138 {:ok, activity, meta}
139 end
140 end
141
142 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
143 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
144 with nil <- Activity.normalize(map),
145 map <- lazy_put_activity_defaults(map, fake),
146 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
147 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
148 {:ok, map} <- MRF.filter(map),
149 {recipients, _, _} = get_recipients(map),
150 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
151 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
152 {:ok, map, object} <- insert_full_object(map),
153 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
154 # Splice in the child object if we have one.
155 activity = Maps.put_if_present(activity, :object, object)
156
157 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
158 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
159 end)
160
161 # Add local posts to search index
162 if local, do: Pleroma.Search.add_to_index(activity)
163
164 {:ok, activity}
165 else
166 %Activity{} = activity ->
167 {:ok, activity}
168
169 {:actor_check, _} ->
170 {:error, false}
171
172 {:containment, _} = error ->
173 error
174
175 {:error, _} = error ->
176 error
177
178 {:fake, true, map, recipients} ->
179 activity = %Activity{
180 data: map,
181 local: local,
182 actor: map["actor"],
183 recipients: recipients,
184 id: "pleroma:fakeid"
185 }
186
187 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
188 {:ok, activity}
189
190 {:remote_limit_pass, _} ->
191 {:error, :remote_limit}
192
193 {:reject, _} = e ->
194 {:error, e}
195 end
196 end
197
198 defp insert_activity_with_expiration(data, local, recipients) do
199 struct = %Activity{
200 data: data,
201 local: local,
202 actor: data["actor"],
203 recipients: recipients
204 }
205
206 with {:ok, activity} <- Repo.insert(struct) do
207 maybe_create_activity_expiration(activity)
208 end
209 end
210
211 def notify_and_stream(activity) do
212 Notification.create_notifications(activity)
213
214 original_activity =
215 case activity do
216 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
217 Activity.get_create_by_object_ap_id_with_object(id)
218
219 _ ->
220 activity
221 end
222
223 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
224 participations = get_participations(conversation)
225 stream_out(activity)
226 stream_out_participations(participations)
227 end
228
229 defp maybe_create_activity_expiration(
230 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
231 ) do
232 with {:ok, _job} <-
233 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
234 activity_id: activity.id,
235 expires_at: expires_at
236 }) do
237 {:ok, activity}
238 end
239 end
240
241 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
242
243 defp create_or_bump_conversation(activity, actor) do
244 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
245 %User{} = user <- User.get_cached_by_ap_id(actor) do
246 Participation.mark_as_read(user, conversation)
247 {:ok, conversation}
248 end
249 end
250
251 defp get_participations({:ok, conversation}) do
252 conversation
253 |> Repo.preload(:participations, force: true)
254 |> Map.get(:participations)
255 end
256
257 defp get_participations(_), do: []
258
259 def stream_out_participations(participations) do
260 participations =
261 participations
262 |> Repo.preload(:user)
263
264 Streamer.stream("participation", participations)
265 end
266
267 @impl true
268 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
269 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
270 conversation = Repo.preload(conversation, :participations)
271
272 last_activity_id =
273 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
274 user: user,
275 blocking_user: user
276 })
277
278 if last_activity_id do
279 stream_out_participations(conversation.participations)
280 end
281 end
282 end
283
284 @impl true
285 def stream_out_participations(_, _), do: :noop
286
287 @impl true
288 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
289 when data_type in ["Create", "Announce", "Delete", "Update"] do
290 activity
291 |> Topics.get_activity_topics()
292 |> Streamer.stream(activity)
293 end
294
295 @impl true
296 def stream_out(_activity) do
297 :noop
298 end
299
300 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
301 def create(params, fake \\ false) do
302 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
303 result
304 end
305 end
306
307 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
308 additional = params[:additional] || %{}
309 # only accept false as false value
310 local = !(params[:local] == false)
311 published = params[:published]
312 quick_insert? = Config.get([:env]) == :benchmark
313
314 create_data =
315 make_create_data(
316 %{to: to, actor: actor, published: published, context: context, object: object},
317 additional
318 )
319
320 with {:ok, activity} <- insert(create_data, local, fake),
321 {:fake, false, activity} <- {:fake, fake, activity},
322 _ <- increase_replies_count_if_reply(create_data),
323 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
324 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
325 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_schedule_poll_notifications(activity),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity}
330 else
331 {:quick_insert, true, activity} ->
332 {:ok, activity}
333
334 {:fake, true, activity} ->
335 {:ok, activity}
336
337 {:error, message} ->
338 Repo.rollback(message)
339 end
340 end
341
342 defp maybe_schedule_poll_notifications(activity) do
343 PollWorker.schedule_poll_end(activity)
344 :ok
345 end
346
347 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | nil | {:error, any()}
349 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
352 result
353 end
354 end
355
356 defp do_unfollow(follower, followed, activity_id, local)
357
358 defp do_unfollow(follower, followed, activity_id, local) when local == true do
359 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 {:ok, activity} <- insert(unfollow_data, local),
362 {:ok, _activity} <- Repo.delete(follow_activity),
363 _ <- notify_and_stream(activity),
364 :ok <- maybe_federate(activity) do
365 {:ok, activity}
366 else
367 nil -> nil
368 {:error, error} -> Repo.rollback(error)
369 end
370 end
371
372 defp do_unfollow(follower, followed, activity_id, false) do
373 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
374 # uses deterministic ids for follows.
375 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
376 {:ok, _activity} <- Repo.delete(follow_activity),
377 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
378 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
379 _ <- notify_and_stream(unfollow_activity) do
380 {:ok, unfollow_activity}
381 else
382 nil -> nil
383 {:error, error} -> Repo.rollback(error)
384 end
385 end
386
387 defp make_unfollow_activity(data, local) do
388 {recipients, _, _} = get_recipients(data)
389
390 %Activity{
391 data: data,
392 local: local,
393 actor: data["actor"],
394 recipients: recipients
395 }
396 end
397
398 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
399 def flag(params) do
400 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
401 result
402 end
403 end
404
405 defp do_flag(
406 %{
407 actor: actor,
408 context: _context,
409 account: account,
410 statuses: statuses,
411 content: content
412 } = params
413 ) do
414 # only accept false as false value
415 local = !(params[:local] == false)
416 forward = !(params[:forward] == false)
417
418 additional = params[:additional] || %{}
419
420 additional =
421 if forward do
422 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
423 else
424 Map.merge(additional, %{"to" => [], "cc" => []})
425 end
426
427 with flag_data <- make_flag_data(params, additional),
428 {:ok, activity} <- insert(flag_data, local),
429 {:ok, stripped_activity} <- strip_report_status_data(activity),
430 _ <- notify_and_stream(activity),
431 :ok <-
432 maybe_federate(stripped_activity) do
433 User.all_superusers()
434 |> Enum.filter(fn user -> user.ap_id != actor end)
435 |> Enum.filter(fn user -> not is_nil(user.email) end)
436 |> Enum.each(fn superuser ->
437 superuser
438 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
439 |> Pleroma.Emails.Mailer.deliver_async()
440 end)
441
442 {:ok, activity}
443 else
444 {:error, error} -> Repo.rollback(error)
445 end
446 end
447
448 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
449 def move(%User{} = origin, %User{} = target, local \\ true) do
450 params = %{
451 "type" => "Move",
452 "actor" => origin.ap_id,
453 "object" => origin.ap_id,
454 "target" => target.ap_id,
455 "to" => [origin.follower_address]
456 }
457
458 with true <- origin.ap_id in target.also_known_as,
459 {:ok, activity} <- insert(params, local),
460 _ <- notify_and_stream(activity) do
461 maybe_federate(activity)
462
463 BackgroundWorker.enqueue("move_following", %{
464 "origin_id" => origin.id,
465 "target_id" => target.id
466 })
467
468 {:ok, activity}
469 else
470 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
471 err -> err
472 end
473 end
474
475 def fetch_activities_for_context_query(context, opts) do
476 public = [Constants.as_public()]
477
478 recipients =
479 if opts[:user],
480 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
481 else: public
482
483 from(activity in Activity)
484 |> maybe_preload_objects(opts)
485 |> maybe_preload_bookmarks(opts)
486 |> maybe_set_thread_muted_field(opts)
487 |> restrict_blocked(opts)
488 |> restrict_blockers_visibility(opts)
489 |> restrict_recipients(recipients, opts[:user])
490 |> restrict_filtered(opts)
491 |> where(
492 [activity],
493 fragment(
494 "?->>'type' = ? and ?->>'context' = ?",
495 activity.data,
496 "Create",
497 activity.data,
498 ^context
499 )
500 )
501 |> exclude_poll_votes(opts)
502 |> exclude_id(opts)
503 |> order_by([activity], desc: activity.id)
504 end
505
506 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
507 def fetch_activities_for_context(context, opts \\ %{}) do
508 context
509 |> fetch_activities_for_context_query(opts)
510 |> Repo.all()
511 end
512
513 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
514 FlakeId.Ecto.CompatType.t() | nil
515 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
516 context
517 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
518 |> restrict_visibility(%{visibility: "direct"})
519 |> limit(1)
520 |> select([a], a.id)
521 |> Repo.one()
522 end
523
524 defp fetch_paginated_optimized(query, opts, pagination) do
525 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
526 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
527 opts = Map.put(opts, :skip_extra_order, true)
528
529 Pagination.fetch_paginated(query, opts, pagination)
530 end
531
532 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
533 list_memberships = Pleroma.List.memberships(opts[:user])
534
535 fetch_activities_query(recipients ++ list_memberships, opts)
536 |> fetch_paginated_optimized(opts, pagination)
537 |> Enum.reverse()
538 |> maybe_update_cc(list_memberships, opts[:user])
539 end
540
541 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
543 includes_local_public = Map.get(opts, :includes_local_public, false)
544
545 opts = Map.delete(opts, :user)
546
547 intended_recipients =
548 if includes_local_public do
549 [Constants.as_public(), as_local_public()]
550 else
551 [Constants.as_public()]
552 end
553
554 intended_recipients
555 |> fetch_activities_query(opts)
556 |> restrict_unlisted(opts)
557 |> fetch_paginated_optimized(opts, pagination)
558 end
559
560 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
561 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
562 opts
563 |> Map.put(:restrict_unlisted, true)
564 |> fetch_public_or_unlisted_activities(pagination)
565 end
566
567 @valid_visibilities ~w[direct unlisted public private]
568
569 defp restrict_visibility(query, %{visibility: visibility})
570 when is_list(visibility) do
571 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
572 from(
573 a in query,
574 where:
575 fragment(
576 "activity_visibility(?, ?, ?) = ANY (?)",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583 else
584 Logger.error("Could not restrict visibility to #{visibility}")
585 end
586 end
587
588 defp restrict_visibility(query, %{visibility: visibility})
589 when visibility in @valid_visibilities do
590 from(
591 a in query,
592 where:
593 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
594 )
595 end
596
597 defp restrict_visibility(_query, %{visibility: visibility})
598 when visibility not in @valid_visibilities do
599 Logger.error("Could not restrict visibility to #{visibility}")
600 end
601
602 defp restrict_visibility(query, _visibility), do: query
603
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when is_list(visibility) do
606 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
607 from(
608 a in query,
609 where:
610 not fragment(
611 "activity_visibility(?, ?, ?) = ANY (?)",
612 a.actor,
613 a.recipients,
614 a.data,
615 ^visibility
616 )
617 )
618 else
619 Logger.error("Could not exclude visibility to #{visibility}")
620 query
621 end
622 end
623
624 defp exclude_visibility(query, %{exclude_visibilities: visibility})
625 when visibility in @valid_visibilities do
626 from(
627 a in query,
628 where:
629 not fragment(
630 "activity_visibility(?, ?, ?) = ?",
631 a.actor,
632 a.recipients,
633 a.data,
634 ^visibility
635 )
636 )
637 end
638
639 defp exclude_visibility(query, %{exclude_visibilities: visibility})
640 when visibility not in [nil | @valid_visibilities] do
641 Logger.error("Could not exclude visibility to #{visibility}")
642 query
643 end
644
645 defp exclude_visibility(query, _visibility), do: query
646
647 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
648 do: query
649
650 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
651 do: query
652
653 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
654 local_public = as_local_public()
655
656 from(
657 a in query,
658 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
659 )
660 end
661
662 defp restrict_thread_visibility(query, _, _), do: query
663
664 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
665 params =
666 params
667 |> Map.put(:user, reading_user)
668 |> Map.put(:actor_id, user.ap_id)
669
670 %{
671 godmode: params[:godmode],
672 reading_user: reading_user
673 }
674 |> user_activities_recipients()
675 |> fetch_activities(params)
676 |> Enum.reverse()
677 end
678
679 def fetch_user_activities(user, reading_user, params \\ %{})
680
681 def fetch_user_activities(user, reading_user, %{total: true} = params) do
682 result = fetch_activities_for_user(user, reading_user, params)
683
684 Keyword.put(result, :items, Enum.reverse(result[:items]))
685 end
686
687 def fetch_user_activities(user, reading_user, params) do
688 user
689 |> fetch_activities_for_user(reading_user, params)
690 |> Enum.reverse()
691 end
692
693 defp fetch_activities_for_user(user, reading_user, params) do
694 params =
695 params
696 |> Map.put(:type, ["Create", "Announce"])
697 |> Map.put(:user, reading_user)
698 |> Map.put(:actor_id, user.ap_id)
699 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
700
701 params =
702 if User.blocks?(reading_user, user) do
703 params
704 else
705 params
706 |> Map.put(:blocking_user, reading_user)
707 |> Map.put(:muting_user, reading_user)
708 end
709
710 pagination_type = Map.get(params, :pagination_type) || :keyset
711
712 %{
713 godmode: params[:godmode],
714 reading_user: reading_user
715 }
716 |> user_activities_recipients()
717 |> fetch_activities(params, pagination_type)
718 end
719
720 def fetch_statuses(reading_user, %{total: true} = params) do
721 result = fetch_activities_for_reading_user(reading_user, params)
722 Keyword.put(result, :items, Enum.reverse(result[:items]))
723 end
724
725 def fetch_statuses(reading_user, params) do
726 reading_user
727 |> fetch_activities_for_reading_user(params)
728 |> Enum.reverse()
729 end
730
731 defp fetch_activities_for_reading_user(reading_user, params) do
732 params = Map.put(params, :type, ["Create", "Announce"])
733
734 %{
735 godmode: params[:godmode],
736 reading_user: reading_user
737 }
738 |> user_activities_recipients()
739 |> fetch_activities(params, :offset)
740 end
741
742 defp user_activities_recipients(%{godmode: true}), do: []
743
744 defp user_activities_recipients(%{reading_user: reading_user}) do
745 if not is_nil(reading_user) and reading_user.local do
746 [
747 Constants.as_public(),
748 as_local_public(),
749 reading_user.ap_id | User.following(reading_user)
750 ]
751 else
752 [Constants.as_public()]
753 end
754 end
755
756 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
757 raise "Can't use the child object without preloading!"
758 end
759
760 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
761 from(
762 [activity, object] in query,
763 where:
764 fragment(
765 "?->>'type' != ? or ?->>'actor' != ?",
766 activity.data,
767 "Announce",
768 object.data,
769 ^actor
770 )
771 )
772 end
773
774 defp restrict_announce_object_actor(query, _), do: query
775
776 defp restrict_since(query, %{since_id: ""}), do: query
777
778 defp restrict_since(query, %{since_id: since_id}) do
779 from(activity in query, where: activity.id > ^since_id)
780 end
781
782 defp restrict_since(query, _), do: query
783
784 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
785 raise_on_missing_preload()
786 end
787
788 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
789 from(
790 [_activity, object] in query,
791 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
792 )
793 end
794
795 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
796 restrict_embedded_tag_any(query, %{tag: tag})
797 end
798
799 defp restrict_embedded_tag_all(query, _), do: query
800
801 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
802 raise_on_missing_preload()
803 end
804
805 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
806 from(
807 [_activity, object] in query,
808 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
809 )
810 end
811
812 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
813 restrict_embedded_tag_any(query, %{tag: [tag]})
814 end
815
816 defp restrict_embedded_tag_any(query, _), do: query
817
818 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
819 raise_on_missing_preload()
820 end
821
822 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
823 from(
824 [_activity, object] in query,
825 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
826 )
827 end
828
829 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
830 when is_binary(tag_reject) do
831 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
832 end
833
834 defp restrict_embedded_tag_reject_any(query, _), do: query
835
836 defp object_ids_query_for_tags(tags) do
837 from(hto in "hashtags_objects")
838 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
839 |> where([hto, ht], ht.name in ^tags)
840 |> select([hto], hto.object_id)
841 |> distinct([hto], true)
842 end
843
844 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
845 raise_on_missing_preload()
846 end
847
848 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
849 restrict_hashtag_any(query, %{tag: single_tag})
850 end
851
852 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
853 from(
854 [_activity, object] in query,
855 where:
856 fragment(
857 """
858 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
859 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
860 AND hashtags_objects.object_id = ?) @> ?
861 """,
862 ^tags,
863 object.id,
864 ^tags
865 )
866 )
867 end
868
869 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
870 restrict_hashtag_all(query, %{tag_all: [tag]})
871 end
872
873 defp restrict_hashtag_all(query, _), do: query
874
875 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
876 raise_on_missing_preload()
877 end
878
879 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
880 hashtag_ids =
881 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
882 |> Repo.all()
883
884 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
885 from(
886 [_activity, object] in query,
887 join: hto in "hashtags_objects",
888 on: hto.object_id == object.id,
889 where: hto.hashtag_id in ^hashtag_ids,
890 distinct: [desc: object.id],
891 order_by: [desc: object.id]
892 )
893 end
894
895 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
896 restrict_hashtag_any(query, %{tag: [tag]})
897 end
898
899 defp restrict_hashtag_any(query, _), do: query
900
901 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
902 raise_on_missing_preload()
903 end
904
905 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
906 from(
907 [_activity, object] in query,
908 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
909 )
910 end
911
912 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
913 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
914 end
915
916 defp restrict_hashtag_reject_any(query, _), do: query
917
918 defp raise_on_missing_preload do
919 raise "Can't use the child object without preloading!"
920 end
921
922 defp restrict_recipients(query, [], _user), do: query
923
924 defp restrict_recipients(query, recipients, nil) do
925 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
926 end
927
928 defp restrict_recipients(query, recipients, user) do
929 from(
930 activity in query,
931 where: fragment("? && ?", ^recipients, activity.recipients),
932 or_where: activity.actor == ^user.ap_id
933 )
934 end
935
936 # Essentially, either look for activities addressed to `recipients`, _OR_ ones
937 # that reference a hashtag that the user follows
938 # Firstly, two fallbacks in case there's no hashtag constraint, or the user doesn't
939 # follow any
940 defp restrict_recipients_or_hashtags(query, recipients, user, nil) do
941 restrict_recipients(query, recipients, user)
942 end
943
944 defp restrict_recipients_or_hashtags(query, recipients, user, []) do
945 restrict_recipients(query, recipients, user)
946 end
947
948 defp restrict_recipients_or_hashtags(query, recipients, _user, hashtag_ids) do
949 from(
950 [activity, object] in query,
951 join: hto in "hashtags_objects",
952 on: hto.object_id == object.id,
953 where:
954 (hto.hashtag_id in ^hashtag_ids and ^Constants.as_public() in activity.recipients) or
955 fragment("? && ?", ^recipients, activity.recipients)
956 )
957 end
958
959 defp restrict_local(query, %{local_only: true}) do
960 from(activity in query, where: activity.local == true)
961 end
962
963 defp restrict_local(query, _), do: query
964
965 defp restrict_remote(query, %{remote: true}) do
966 from(activity in query, where: activity.local == false)
967 end
968
969 defp restrict_remote(query, _), do: query
970
971 defp restrict_actor(query, %{actor_id: actor_id}) do
972 from(activity in query, where: activity.actor == ^actor_id)
973 end
974
975 defp restrict_actor(query, _), do: query
976
977 defp restrict_type(query, %{type: type}) when is_binary(type) do
978 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
979 end
980
981 defp restrict_type(query, %{type: type}) do
982 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
983 end
984
985 defp restrict_type(query, _), do: query
986
987 defp restrict_state(query, %{state: state}) do
988 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
989 end
990
991 defp restrict_state(query, _), do: query
992
993 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
994 from(
995 [_activity, object] in query,
996 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
997 )
998 end
999
1000 defp restrict_favorited_by(query, _), do: query
1001
1002 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
1003 raise "Can't use the child object without preloading!"
1004 end
1005
1006 defp restrict_media(query, %{only_media: true}) do
1007 from(
1008 [activity, object] in query,
1009 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
1010 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
1011 )
1012 end
1013
1014 defp restrict_media(query, _), do: query
1015
1016 defp restrict_replies(query, %{exclude_replies: true}) do
1017 from(
1018 [_activity, object] in query,
1019 where: fragment("?->>'inReplyTo' is null", object.data)
1020 )
1021 end
1022
1023 defp restrict_replies(query, %{
1024 reply_filtering_user: %User{} = user,
1025 reply_visibility: "self"
1026 }) do
1027 from(
1028 [activity, object] in query,
1029 where:
1030 fragment(
1031 "?->>'inReplyTo' is null OR ? = ANY(?)",
1032 object.data,
1033 ^user.ap_id,
1034 activity.recipients
1035 )
1036 )
1037 end
1038
1039 defp restrict_replies(query, %{
1040 reply_filtering_user: %User{} = user,
1041 reply_visibility: "following"
1042 }) do
1043 from(
1044 [activity, object] in query,
1045 where:
1046 fragment(
1047 """
1048 ?->>'type' != 'Create' -- This isn't a Create
1049 OR ?->>'inReplyTo' is null -- this isn't a reply
1050 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1051 -- unless they are the author (because authors
1052 -- are also part of the recipients). This leads
1053 -- to a bug that self-replies by friends won't
1054 -- show up.
1055 OR ? = ? -- The actor is us
1056 """,
1057 activity.data,
1058 object.data,
1059 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1060 activity.recipients,
1061 activity.actor,
1062 activity.actor,
1063 ^user.ap_id
1064 )
1065 )
1066 end
1067
1068 defp restrict_replies(query, _), do: query
1069
1070 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1071 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1072 end
1073
1074 defp restrict_reblogs(query, _), do: query
1075
1076 defp restrict_muted(query, %{with_muted: true}), do: query
1077
1078 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1079 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1080
1081 query =
1082 from([activity] in query,
1083 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1084 where:
1085 fragment(
1086 "not (?->'to' \\?| ?) or ? = ?",
1087 activity.data,
1088 ^mutes,
1089 activity.actor,
1090 ^user.ap_id
1091 )
1092 )
1093
1094 unless opts[:skip_preload] do
1095 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1096 else
1097 query
1098 end
1099 end
1100
1101 defp restrict_muted(query, _), do: query
1102
1103 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1104 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1105 domain_blocks = user.domain_blocks || []
1106
1107 following_ap_ids = User.get_friends_ap_ids(user)
1108
1109 query =
1110 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1111
1112 from(
1113 [activity, object: o] in query,
1114 # You don't block the author
1115 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1116
1117 # You don't block any recipients, and didn't author the post
1118 where:
1119 fragment(
1120 "((not (? && ?)) or ? = ?)",
1121 activity.recipients,
1122 ^blocked_ap_ids,
1123 activity.actor,
1124 ^user.ap_id
1125 ),
1126
1127 # You don't block the domain of any recipients, and didn't author the post
1128 where:
1129 fragment(
1130 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1131 activity.recipients,
1132 ^domain_blocks,
1133 activity.actor,
1134 ^user.ap_id
1135 ),
1136
1137 # It's not a boost of a user you block
1138 where:
1139 fragment(
1140 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1141 activity.data,
1142 activity.data,
1143 ^blocked_ap_ids
1144 ),
1145
1146 # You don't block the author's domain, and also don't follow the author
1147 where:
1148 fragment(
1149 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1150 activity.actor,
1151 ^domain_blocks,
1152 activity.actor,
1153 ^following_ap_ids
1154 ),
1155
1156 # Same as above, but checks the Object
1157 where:
1158 fragment(
1159 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1160 o.data,
1161 ^domain_blocks,
1162 o.data,
1163 ^following_ap_ids
1164 )
1165 )
1166 end
1167
1168 defp restrict_blocked(query, _), do: query
1169
1170 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1171 if Config.get([:activitypub, :blockers_visible]) == true do
1172 query
1173 else
1174 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1175
1176 from(
1177 activity in query,
1178 # The author doesn't block you
1179 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1180
1181 # It's not a boost of a user that blocks you
1182 where:
1183 fragment(
1184 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1185 activity.data,
1186 activity.data,
1187 ^blocker_ap_ids
1188 )
1189 )
1190 end
1191 end
1192
1193 defp restrict_blockers_visibility(query, _), do: query
1194
1195 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1196 from(
1197 activity in query,
1198 where:
1199 fragment(
1200 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1201 activity.data,
1202 ^[Constants.as_public()]
1203 )
1204 )
1205 end
1206
1207 defp restrict_unlisted(query, _), do: query
1208
1209 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1210 from(
1211 [activity, object: o] in query,
1212 where:
1213 fragment(
1214 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1215 activity.data,
1216 activity.data,
1217 activity.data,
1218 ^ids
1219 )
1220 )
1221 end
1222
1223 defp restrict_pinned(query, _), do: query
1224
1225 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1226 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1227
1228 from(
1229 activity in query,
1230 where:
1231 fragment(
1232 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1233 activity.data,
1234 activity.actor,
1235 ^muted_reblogs
1236 )
1237 )
1238 end
1239
1240 defp restrict_muted_reblogs(query, _), do: query
1241
1242 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1243 from(
1244 activity in query,
1245 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1246 )
1247 end
1248
1249 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1250 from(
1251 activity in query,
1252 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1253 )
1254 end
1255
1256 defp restrict_instance(query, _), do: query
1257
1258 defp restrict_filtered(query, %{user: %User{} = user}) do
1259 case Filter.compose_regex(user) do
1260 nil ->
1261 query
1262
1263 regex ->
1264 from([activity, object] in query,
1265 where:
1266 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1267 activity.actor == ^user.ap_id
1268 )
1269 end
1270 end
1271
1272 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1273 restrict_filtered(query, %{user: user})
1274 end
1275
1276 defp restrict_filtered(query, _), do: query
1277
1278 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1279
1280 defp exclude_poll_votes(query, _) do
1281 if has_named_binding?(query, :object) do
1282 from([activity, object: o] in query,
1283 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1284 )
1285 else
1286 query
1287 end
1288 end
1289
1290 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1291 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1292
1293 defp exclude_invisible_actors(query, _opts) do
1294 query
1295 |> join(:inner, [activity], u in User,
1296 as: :u,
1297 on: activity.actor == u.ap_id and u.invisible == false
1298 )
1299 end
1300
1301 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1302 from(activity in query, where: activity.id != ^id)
1303 end
1304
1305 defp exclude_id(query, _), do: query
1306
1307 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1308
1309 defp maybe_preload_objects(query, _) do
1310 query
1311 |> Activity.with_preloaded_object()
1312 end
1313
1314 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1315
1316 defp maybe_preload_bookmarks(query, opts) do
1317 query
1318 |> Activity.with_preloaded_bookmark(opts[:user])
1319 end
1320
1321 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1322 query
1323 |> Activity.with_preloaded_report_notes()
1324 end
1325
1326 defp maybe_preload_report_notes(query, _), do: query
1327
1328 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1329
1330 defp maybe_set_thread_muted_field(query, opts) do
1331 query
1332 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1333 end
1334
1335 defp maybe_order(query, %{order: :desc}) do
1336 query
1337 |> order_by(desc: :id)
1338 end
1339
1340 defp maybe_order(query, %{order: :asc}) do
1341 query
1342 |> order_by(asc: :id)
1343 end
1344
1345 defp maybe_order(query, _), do: query
1346
1347 defp normalize_fetch_activities_query_opts(opts) do
1348 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1349 case opts[key] do
1350 value when is_bitstring(value) ->
1351 Map.put(opts, key, Hashtag.normalize_name(value))
1352
1353 value when is_list(value) ->
1354 normalized_value =
1355 value
1356 |> Enum.map(&Hashtag.normalize_name/1)
1357 |> Enum.uniq()
1358
1359 Map.put(opts, key, normalized_value)
1360
1361 _ ->
1362 opts
1363 end
1364 end)
1365 end
1366
1367 defp fetch_activities_query_ap_ids_ops(opts) do
1368 source_user = opts[:muting_user]
1369 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1370
1371 ap_id_relationships =
1372 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1373 [:block | ap_id_relationships]
1374 else
1375 ap_id_relationships
1376 end
1377
1378 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1379
1380 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1381 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1382
1383 restrict_muted_reblogs_opts =
1384 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1385
1386 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1387 end
1388
1389 def fetch_activities_query(recipients, opts \\ %{}) do
1390 opts = normalize_fetch_activities_query_opts(opts)
1391
1392 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1393 fetch_activities_query_ap_ids_ops(opts)
1394
1395 config = %{
1396 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1397 }
1398
1399 query =
1400 Activity
1401 |> maybe_preload_objects(opts)
1402 |> maybe_preload_bookmarks(opts)
1403 |> maybe_preload_report_notes(opts)
1404 |> maybe_set_thread_muted_field(opts)
1405 |> maybe_order(opts)
1406 |> restrict_recipients_or_hashtags(recipients, opts[:user], opts[:followed_hashtags])
1407 |> restrict_replies(opts)
1408 |> restrict_since(opts)
1409 |> restrict_local(opts)
1410 |> restrict_remote(opts)
1411 |> restrict_actor(opts)
1412 |> restrict_type(opts)
1413 |> restrict_state(opts)
1414 |> restrict_favorited_by(opts)
1415 |> restrict_blocked(restrict_blocked_opts)
1416 |> restrict_blockers_visibility(opts)
1417 |> restrict_muted(restrict_muted_opts)
1418 |> restrict_filtered(opts)
1419 |> restrict_media(opts)
1420 |> restrict_visibility(opts)
1421 |> restrict_thread_visibility(opts, config)
1422 |> restrict_reblogs(opts)
1423 |> restrict_pinned(opts)
1424 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1425 |> restrict_instance(opts)
1426 |> restrict_announce_object_actor(opts)
1427 |> restrict_filtered(opts)
1428 |> maybe_restrict_deactivated_users(opts)
1429 |> exclude_poll_votes(opts)
1430 |> exclude_invisible_actors(opts)
1431 |> exclude_visibility(opts)
1432
1433 if Config.feature_enabled?(:improved_hashtag_timeline) do
1434 query
1435 |> restrict_hashtag_any(opts)
1436 |> restrict_hashtag_all(opts)
1437 |> restrict_hashtag_reject_any(opts)
1438 else
1439 query
1440 |> restrict_embedded_tag_any(opts)
1441 |> restrict_embedded_tag_all(opts)
1442 |> restrict_embedded_tag_reject_any(opts)
1443 end
1444 end
1445
1446 @doc """
1447 Fetch favorites activities of user with order by sort adds to favorites
1448 """
1449 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1450 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1451 user.ap_id
1452 |> Activity.Queries.by_actor()
1453 |> Activity.Queries.by_type("Like")
1454 |> Activity.with_joined_object()
1455 |> Object.with_joined_activity()
1456 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1457 |> order_by([like, _, _], desc_nulls_last: like.id)
1458 |> Pagination.fetch_paginated(
1459 Map.merge(params, %{skip_order: true}),
1460 pagination
1461 )
1462 end
1463
1464 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1465 Enum.map(activities, fn
1466 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1467 if Enum.any?(bcc, &(&1 in list_memberships)) do
1468 update_in(activity.data["cc"], &[user_ap_id | &1])
1469 else
1470 activity
1471 end
1472
1473 activity ->
1474 activity
1475 end)
1476 end
1477
1478 defp maybe_update_cc(activities, _, _), do: activities
1479
1480 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1481 from(activity in query,
1482 where:
1483 fragment("? && ?", activity.recipients, ^recipients) or
1484 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1485 ^Constants.as_public() in activity.recipients)
1486 )
1487 end
1488
1489 def fetch_activities_bounded(
1490 recipients,
1491 recipients_with_public,
1492 opts \\ %{},
1493 pagination \\ :keyset
1494 ) do
1495 fetch_activities_query([], opts)
1496 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1497 |> Pagination.fetch_paginated(opts, pagination)
1498 |> Enum.reverse()
1499 end
1500
1501 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1502 def upload(file, opts \\ []) do
1503 with {:ok, data} <- Upload.store(file, opts) do
1504 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1505
1506 Repo.insert(%Object{data: obj_data})
1507 end
1508 end
1509
1510 @spec get_actor_url(any()) :: binary() | nil
1511 defp get_actor_url(url) when is_binary(url), do: url
1512 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1513
1514 defp get_actor_url(url) when is_list(url) do
1515 url
1516 |> List.first()
1517 |> get_actor_url()
1518 end
1519
1520 defp get_actor_url(_url), do: nil
1521
1522 defp normalize_image(%{"url" => url}) do
1523 %{
1524 "type" => "Image",
1525 "url" => [%{"href" => url}]
1526 }
1527 end
1528
1529 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1530 defp normalize_image(_), do: nil
1531
1532 defp object_to_user_data(data, additional) do
1533 fields =
1534 data
1535 |> Map.get("attachment", [])
1536 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1537 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1538
1539 emojis =
1540 data
1541 |> Map.get("tag", [])
1542 |> Enum.filter(fn
1543 %{"type" => "Emoji"} -> true
1544 _ -> false
1545 end)
1546 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1547 {String.trim(name, ":"), url}
1548 end)
1549
1550 is_locked = data["manuallyApprovesFollowers"] || false
1551 data = Transmogrifier.maybe_fix_user_object(data)
1552 is_discoverable = data["discoverable"] || false
1553 invisible = data["invisible"] || false
1554 actor_type = data["type"] || "Person"
1555
1556 featured_address = data["featured"]
1557 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1558
1559 public_key =
1560 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1561 data["publicKey"]["publicKeyPem"]
1562 end
1563
1564 shared_inbox =
1565 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1566 data["endpoints"]["sharedInbox"]
1567 end
1568
1569 # if WebFinger request was already done, we probably have acct, otherwise
1570 # we request WebFinger here
1571 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1572
1573 # also_known_as must be a URL
1574 also_known_as =
1575 data
1576 |> Map.get("alsoKnownAs", [])
1577 |> Enum.filter(fn url ->
1578 case URI.parse(url) do
1579 %URI{scheme: "http"} -> true
1580 %URI{scheme: "https"} -> true
1581 _ -> false
1582 end
1583 end)
1584
1585 %{
1586 ap_id: data["id"],
1587 uri: get_actor_url(data["url"]),
1588 ap_enabled: true,
1589 banner: normalize_image(data["image"]),
1590 fields: fields,
1591 emoji: emojis,
1592 is_locked: is_locked,
1593 is_discoverable: is_discoverable,
1594 invisible: invisible,
1595 avatar: normalize_image(data["icon"]),
1596 name: data["name"],
1597 follower_address: data["followers"],
1598 following_address: data["following"],
1599 featured_address: featured_address,
1600 bio: data["summary"] || "",
1601 actor_type: actor_type,
1602 also_known_as: also_known_as,
1603 public_key: public_key,
1604 inbox: data["inbox"],
1605 shared_inbox: shared_inbox,
1606 pinned_objects: pinned_objects,
1607 nickname: nickname
1608 }
1609 end
1610
1611 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1612 generated = "#{username}@#{URI.parse(data["id"]).host}"
1613
1614 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1615 case WebFinger.finger(generated) do
1616 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1617 _ -> generated
1618 end
1619 else
1620 generated
1621 end
1622 end
1623
1624 # nickname can be nil because of virtual actors
1625 defp generate_nickname(_), do: nil
1626
1627 def fetch_follow_information_for_user(user) do
1628 with {:ok, following_data} <-
1629 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1630 {:ok, hide_follows} <- collection_private(following_data),
1631 {:ok, followers_data} <-
1632 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1633 {:ok, hide_followers} <- collection_private(followers_data) do
1634 {:ok,
1635 %{
1636 hide_follows: hide_follows,
1637 follower_count: normalize_counter(followers_data["totalItems"]),
1638 following_count: normalize_counter(following_data["totalItems"]),
1639 hide_followers: hide_followers
1640 }}
1641 else
1642 {:error, _} = e -> e
1643 e -> {:error, e}
1644 end
1645 end
1646
1647 defp normalize_counter(counter) when is_integer(counter), do: counter
1648 defp normalize_counter(_), do: 0
1649
1650 def maybe_update_follow_information(user_data) do
1651 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1652 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1653 {_, true} <-
1654 {:collections_available,
1655 !!(user_data[:following_address] && user_data[:follower_address])},
1656 {:ok, info} <-
1657 fetch_follow_information_for_user(user_data) do
1658 info = Map.merge(user_data[:info] || %{}, info)
1659
1660 user_data
1661 |> Map.put(:info, info)
1662 else
1663 {:user_type_check, false} ->
1664 user_data
1665
1666 {:collections_available, false} ->
1667 user_data
1668
1669 {:enabled, false} ->
1670 user_data
1671
1672 e ->
1673 Logger.error(
1674 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1675 )
1676
1677 user_data
1678 end
1679 end
1680
1681 defp collection_private(%{"first" => %{"type" => type}})
1682 when type in ["CollectionPage", "OrderedCollectionPage"],
1683 do: {:ok, false}
1684
1685 defp collection_private(%{"first" => first}) do
1686 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1687 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1688 {:ok, false}
1689 else
1690 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1691 {:error, _} = e -> e
1692 e -> {:error, e}
1693 end
1694 end
1695
1696 defp collection_private(_data), do: {:ok, true}
1697
1698 def user_data_from_user_object(data, additional \\ []) do
1699 with {:ok, data} <- MRF.filter(data) do
1700 {:ok, object_to_user_data(data, additional)}
1701 else
1702 e -> {:error, e}
1703 end
1704 end
1705
1706 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1707 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1708 {:ok, data} <- user_data_from_user_object(data, additional) do
1709 {:ok, maybe_update_follow_information(data)}
1710 else
1711 # If this has been deleted, only log a debug and not an error
1712 {:error, "Object has been deleted" = e} ->
1713 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1714 {:error, e}
1715
1716 {:error, {:reject, reason} = e} ->
1717 Logger.debug("Rejected user #{ap_id}: #{inspect(reason)}")
1718 {:error, e}
1719
1720 {:error, e} ->
1721 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1722 {:error, e}
1723 end
1724 end
1725
1726 def maybe_handle_clashing_nickname(data) do
1727 with nickname when is_binary(nickname) <- data[:nickname],
1728 %User{} = old_user <- User.get_by_nickname(nickname),
1729 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1730 Logger.info(
1731 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1732 )
1733
1734 old_user
1735 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1736 |> User.update_and_set_cache()
1737 else
1738 {:ap_id_comparison, true} ->
1739 Logger.info(
1740 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1741 )
1742
1743 _ ->
1744 nil
1745 end
1746 end
1747
1748 def pin_data_from_featured_collection(%{
1749 "type" => "OrderedCollection",
1750 "first" => first
1751 }) do
1752 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1753 page
1754 |> Map.get("orderedItems")
1755 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1756 else
1757 e ->
1758 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1759 {:ok, %{}}
1760 end
1761 end
1762
1763 def pin_data_from_featured_collection(
1764 %{
1765 "type" => type
1766 } = collection
1767 )
1768 when type in ["OrderedCollection", "Collection"] do
1769 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1770
1771 # Items can either be a map _or_ a string
1772 objects
1773 |> Map.new(fn
1774 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1775 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1776 end)
1777 end
1778
1779 def fetch_and_prepare_featured_from_ap_id(nil) do
1780 {:ok, %{}}
1781 end
1782
1783 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1784 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1785 {:ok, pin_data_from_featured_collection(data)}
1786 else
1787 e ->
1788 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1789 {:ok, %{}}
1790 end
1791 end
1792
1793 def pinned_fetch_task(nil), do: nil
1794
1795 def pinned_fetch_task(%{pinned_objects: pins}) do
1796 if Enum.all?(pins, fn {ap_id, _} ->
1797 Object.get_cached_by_ap_id(ap_id) ||
1798 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1799 end) do
1800 :ok
1801 else
1802 :error
1803 end
1804 end
1805
1806 def make_user_from_ap_id(ap_id, additional \\ []) do
1807 user = User.get_cached_by_ap_id(ap_id)
1808
1809 if user && !User.ap_enabled?(user) do
1810 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1811 else
1812 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1813 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1814
1815 if user do
1816 user
1817 |> User.remote_user_changeset(data)
1818 |> User.update_and_set_cache()
1819 else
1820 maybe_handle_clashing_nickname(data)
1821
1822 data
1823 |> User.remote_user_changeset()
1824 |> Repo.insert()
1825 |> User.set_cache()
1826 end
1827 end
1828 end
1829 end
1830
1831 def make_user_from_nickname(nickname) do
1832 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1833 WebFinger.finger(nickname) do
1834 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1835 else
1836 _e -> {:error, "No AP id in WebFinger"}
1837 end
1838 end
1839
1840 # filter out broken threads
1841 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1842 entire_thread_visible_for_user?(activity, user)
1843 end
1844
1845 # do post-processing on a specific activity
1846 def contain_activity(%Activity{} = activity, %User{} = user) do
1847 contain_broken_threads(activity, user)
1848 end
1849
1850 def fetch_direct_messages_query do
1851 Activity
1852 |> restrict_type(%{type: "Create"})
1853 |> restrict_visibility(%{visibility: "direct"})
1854 |> order_by([activity], asc: activity.id)
1855 end
1856
1857 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1858
1859 defp maybe_restrict_deactivated_users(activity, _opts),
1860 do: Activity.restrict_deactivated_users(activity)
1861 end