5f7541b027df388afe975dda6d50f6498ea55880
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 original_activity =
198 case activity do
199 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
200 Activity.get_create_by_object_ap_id_with_object(id)
201
202 _ ->
203 activity
204 end
205
206 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
207 participations = get_participations(conversation)
208 stream_out(activity)
209 stream_out_participations(participations)
210 end
211
212 defp maybe_create_activity_expiration(
213 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
214 ) do
215 with {:ok, _job} <-
216 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
217 activity_id: activity.id,
218 expires_at: expires_at
219 }) do
220 {:ok, activity}
221 end
222 end
223
224 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
225
226 defp create_or_bump_conversation(activity, actor) do
227 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
228 %User{} = user <- User.get_cached_by_ap_id(actor) do
229 Participation.mark_as_read(user, conversation)
230 {:ok, conversation}
231 end
232 end
233
234 defp get_participations({:ok, conversation}) do
235 conversation
236 |> Repo.preload(:participations, force: true)
237 |> Map.get(:participations)
238 end
239
240 defp get_participations(_), do: []
241
242 def stream_out_participations(participations) do
243 participations =
244 participations
245 |> Repo.preload(:user)
246
247 Streamer.stream("participation", participations)
248 end
249
250 @impl true
251 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
252 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
253 conversation = Repo.preload(conversation, :participations)
254
255 last_activity_id =
256 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
257 user: user,
258 blocking_user: user
259 })
260
261 if last_activity_id do
262 stream_out_participations(conversation.participations)
263 end
264 end
265 end
266
267 @impl true
268 def stream_out_participations(_, _), do: :noop
269
270 @impl true
271 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
272 when data_type in ["Create", "Announce", "Delete", "Update"] do
273 activity
274 |> Topics.get_activity_topics()
275 |> Streamer.stream(activity)
276 end
277
278 @impl true
279 def stream_out(_activity) do
280 :noop
281 end
282
283 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
284 def create(params, fake \\ false) do
285 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
286 result
287 end
288 end
289
290 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
291 additional = params[:additional] || %{}
292 # only accept false as false value
293 local = !(params[:local] == false)
294 published = params[:published]
295 quick_insert? = Config.get([:env]) == :benchmark
296
297 create_data =
298 make_create_data(
299 %{to: to, actor: actor, published: published, context: context, object: object},
300 additional
301 )
302
303 with {:ok, activity} <- insert(create_data, local, fake),
304 {:fake, false, activity} <- {:fake, fake, activity},
305 _ <- increase_replies_count_if_reply(create_data),
306 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
307 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
308 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
309 _ <- notify_and_stream(activity),
310 :ok <- maybe_schedule_poll_notifications(activity),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 else
314 {:quick_insert, true, activity} ->
315 {:ok, activity}
316
317 {:fake, true, activity} ->
318 {:ok, activity}
319
320 {:error, message} ->
321 Repo.rollback(message)
322 end
323 end
324
325 defp maybe_schedule_poll_notifications(activity) do
326 PollWorker.schedule_poll_end(activity)
327 :ok
328 end
329
330 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
331 {:ok, Activity.t()} | nil | {:error, any()}
332 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
333 with {:ok, result} <-
334 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
335 result
336 end
337 end
338
339 defp do_unfollow(follower, followed, activity_id, local)
340
341 defp do_unfollow(follower, followed, activity_id, local) when local == true do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
344 {:ok, activity} <- insert(unfollow_data, local),
345 {:ok, _activity} <- Repo.delete(follow_activity),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
348 {:ok, activity}
349 else
350 nil -> nil
351 {:error, error} -> Repo.rollback(error)
352 end
353 end
354
355 defp do_unfollow(follower, followed, activity_id, false) do
356 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
357 # uses deterministic ids for follows.
358 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
359 {:ok, _activity} <- Repo.delete(follow_activity),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
362 _ <- notify_and_stream(unfollow_activity) do
363 {:ok, unfollow_activity}
364 else
365 nil -> nil
366 {:error, error} -> Repo.rollback(error)
367 end
368 end
369
370 defp make_unfollow_activity(data, local) do
371 {recipients, _, _} = get_recipients(data)
372
373 %Activity{
374 data: data,
375 local: local,
376 actor: data["actor"],
377 recipients: recipients
378 }
379 end
380
381 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
382 def flag(params) do
383 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
384 result
385 end
386 end
387
388 defp do_flag(
389 %{
390 actor: actor,
391 context: _context,
392 account: account,
393 statuses: statuses,
394 content: content
395 } = params
396 ) do
397 # only accept false as false value
398 local = !(params[:local] == false)
399 forward = !(params[:forward] == false)
400
401 additional = params[:additional] || %{}
402
403 additional =
404 if forward do
405 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
406 else
407 Map.merge(additional, %{"to" => [], "cc" => []})
408 end
409
410 with flag_data <- make_flag_data(params, additional),
411 {:ok, activity} <- insert(flag_data, local),
412 {:ok, stripped_activity} <- strip_report_status_data(activity),
413 _ <- notify_and_stream(activity),
414 :ok <-
415 maybe_federate(stripped_activity) do
416 User.all_superusers()
417 |> Enum.filter(fn user -> user.ap_id != actor end)
418 |> Enum.filter(fn user -> not is_nil(user.email) end)
419 |> Enum.each(fn superuser ->
420 superuser
421 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
422 |> Pleroma.Emails.Mailer.deliver_async()
423 end)
424
425 {:ok, activity}
426 else
427 {:error, error} -> Repo.rollback(error)
428 end
429 end
430
431 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
432 def move(%User{} = origin, %User{} = target, local \\ true) do
433 params = %{
434 "type" => "Move",
435 "actor" => origin.ap_id,
436 "object" => origin.ap_id,
437 "target" => target.ap_id,
438 "to" => [origin.follower_address]
439 }
440
441 with true <- origin.ap_id in target.also_known_as,
442 {:ok, activity} <- insert(params, local),
443 _ <- notify_and_stream(activity) do
444 maybe_federate(activity)
445
446 BackgroundWorker.enqueue("move_following", %{
447 "origin_id" => origin.id,
448 "target_id" => target.id
449 })
450
451 {:ok, activity}
452 else
453 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
454 err -> err
455 end
456 end
457
458 def fetch_activities_for_context_query(context, opts) do
459 public = [Constants.as_public()]
460
461 recipients =
462 if opts[:user],
463 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
464 else: public
465
466 from(activity in Activity)
467 |> maybe_preload_objects(opts)
468 |> maybe_preload_bookmarks(opts)
469 |> maybe_set_thread_muted_field(opts)
470 |> restrict_blocked(opts)
471 |> restrict_blockers_visibility(opts)
472 |> restrict_recipients(recipients, opts[:user])
473 |> restrict_filtered(opts)
474 |> where(
475 [activity],
476 fragment(
477 "?->>'type' = ? and ?->>'context' = ?",
478 activity.data,
479 "Create",
480 activity.data,
481 ^context
482 )
483 )
484 |> exclude_poll_votes(opts)
485 |> exclude_id(opts)
486 |> order_by([activity], desc: activity.id)
487 end
488
489 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
490 def fetch_activities_for_context(context, opts \\ %{}) do
491 context
492 |> fetch_activities_for_context_query(opts)
493 |> Repo.all()
494 end
495
496 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
497 FlakeId.Ecto.CompatType.t() | nil
498 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
499 context
500 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
501 |> restrict_visibility(%{visibility: "direct"})
502 |> limit(1)
503 |> select([a], a.id)
504 |> Repo.one()
505 end
506
507 defp fetch_paginated_optimized(query, opts, pagination) do
508 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
509 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
510 opts = Map.put(opts, :skip_extra_order, true)
511
512 Pagination.fetch_paginated(query, opts, pagination)
513 end
514
515 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
516 list_memberships = Pleroma.List.memberships(opts[:user])
517
518 fetch_activities_query(recipients ++ list_memberships, opts)
519 |> fetch_paginated_optimized(opts, pagination)
520 |> Enum.reverse()
521 |> maybe_update_cc(list_memberships, opts[:user])
522 end
523
524 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
525 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
526 includes_local_public = Map.get(opts, :includes_local_public, false)
527
528 opts = Map.delete(opts, :user)
529
530 intended_recipients =
531 if includes_local_public do
532 [Constants.as_public(), as_local_public()]
533 else
534 [Constants.as_public()]
535 end
536
537 intended_recipients
538 |> fetch_activities_query(opts)
539 |> restrict_unlisted(opts)
540 |> fetch_paginated_optimized(opts, pagination)
541 end
542
543 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
544 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
545 opts
546 |> Map.put(:restrict_unlisted, true)
547 |> fetch_public_or_unlisted_activities(pagination)
548 end
549
550 @valid_visibilities ~w[direct unlisted public private]
551
552 defp restrict_visibility(query, %{visibility: visibility})
553 when is_list(visibility) do
554 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
555 from(
556 a in query,
557 where:
558 fragment(
559 "activity_visibility(?, ?, ?) = ANY (?)",
560 a.actor,
561 a.recipients,
562 a.data,
563 ^visibility
564 )
565 )
566 else
567 Logger.error("Could not restrict visibility to #{visibility}")
568 end
569 end
570
571 defp restrict_visibility(query, %{visibility: visibility})
572 when visibility in @valid_visibilities do
573 from(
574 a in query,
575 where:
576 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
577 )
578 end
579
580 defp restrict_visibility(_query, %{visibility: visibility})
581 when visibility not in @valid_visibilities do
582 Logger.error("Could not restrict visibility to #{visibility}")
583 end
584
585 defp restrict_visibility(query, _visibility), do: query
586
587 defp exclude_visibility(query, %{exclude_visibilities: visibility})
588 when is_list(visibility) do
589 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
590 from(
591 a in query,
592 where:
593 not fragment(
594 "activity_visibility(?, ?, ?) = ANY (?)",
595 a.actor,
596 a.recipients,
597 a.data,
598 ^visibility
599 )
600 )
601 else
602 Logger.error("Could not exclude visibility to #{visibility}")
603 query
604 end
605 end
606
607 defp exclude_visibility(query, %{exclude_visibilities: visibility})
608 when visibility in @valid_visibilities do
609 from(
610 a in query,
611 where:
612 not fragment(
613 "activity_visibility(?, ?, ?) = ?",
614 a.actor,
615 a.recipients,
616 a.data,
617 ^visibility
618 )
619 )
620 end
621
622 defp exclude_visibility(query, %{exclude_visibilities: visibility})
623 when visibility not in [nil | @valid_visibilities] do
624 Logger.error("Could not exclude visibility to #{visibility}")
625 query
626 end
627
628 defp exclude_visibility(query, _visibility), do: query
629
630 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
631 do: query
632
633 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
634 do: query
635
636 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
637 local_public = as_local_public()
638
639 from(
640 a in query,
641 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
642 )
643 end
644
645 defp restrict_thread_visibility(query, _, _), do: query
646
647 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
648 params =
649 params
650 |> Map.put(:user, reading_user)
651 |> Map.put(:actor_id, user.ap_id)
652
653 %{
654 godmode: params[:godmode],
655 reading_user: reading_user
656 }
657 |> user_activities_recipients()
658 |> fetch_activities(params)
659 |> Enum.reverse()
660 end
661
662 def fetch_user_activities(user, reading_user, params \\ %{})
663
664 def fetch_user_activities(user, reading_user, %{total: true} = params) do
665 result = fetch_activities_for_user(user, reading_user, params)
666
667 Keyword.put(result, :items, Enum.reverse(result[:items]))
668 end
669
670 def fetch_user_activities(user, reading_user, params) do
671 user
672 |> fetch_activities_for_user(reading_user, params)
673 |> Enum.reverse()
674 end
675
676 defp fetch_activities_for_user(user, reading_user, params) do
677 params =
678 params
679 |> Map.put(:type, ["Create", "Announce"])
680 |> Map.put(:user, reading_user)
681 |> Map.put(:actor_id, user.ap_id)
682 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
683
684 params =
685 if User.blocks?(reading_user, user) do
686 params
687 else
688 params
689 |> Map.put(:blocking_user, reading_user)
690 |> Map.put(:muting_user, reading_user)
691 end
692
693 pagination_type = Map.get(params, :pagination_type) || :keyset
694
695 %{
696 godmode: params[:godmode],
697 reading_user: reading_user
698 }
699 |> user_activities_recipients()
700 |> fetch_activities(params, pagination_type)
701 end
702
703 def fetch_statuses(reading_user, %{total: true} = params) do
704 result = fetch_activities_for_reading_user(reading_user, params)
705 Keyword.put(result, :items, Enum.reverse(result[:items]))
706 end
707
708 def fetch_statuses(reading_user, params) do
709 reading_user
710 |> fetch_activities_for_reading_user(params)
711 |> Enum.reverse()
712 end
713
714 defp fetch_activities_for_reading_user(reading_user, params) do
715 params = Map.put(params, :type, ["Create", "Announce"])
716
717 %{
718 godmode: params[:godmode],
719 reading_user: reading_user
720 }
721 |> user_activities_recipients()
722 |> fetch_activities(params, :offset)
723 end
724
725 defp user_activities_recipients(%{godmode: true}), do: []
726
727 defp user_activities_recipients(%{reading_user: reading_user}) do
728 if not is_nil(reading_user) and reading_user.local do
729 [
730 Constants.as_public(),
731 as_local_public(),
732 reading_user.ap_id | User.following(reading_user)
733 ]
734 else
735 [Constants.as_public()]
736 end
737 end
738
739 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
740 raise "Can't use the child object without preloading!"
741 end
742
743 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
744 from(
745 [activity, object] in query,
746 where:
747 fragment(
748 "?->>'type' != ? or ?->>'actor' != ?",
749 activity.data,
750 "Announce",
751 object.data,
752 ^actor
753 )
754 )
755 end
756
757 defp restrict_announce_object_actor(query, _), do: query
758
759 defp restrict_since(query, %{since_id: ""}), do: query
760
761 defp restrict_since(query, %{since_id: since_id}) do
762 from(activity in query, where: activity.id > ^since_id)
763 end
764
765 defp restrict_since(query, _), do: query
766
767 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
768 raise_on_missing_preload()
769 end
770
771 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
772 from(
773 [_activity, object] in query,
774 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
775 )
776 end
777
778 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
779 restrict_embedded_tag_any(query, %{tag: tag})
780 end
781
782 defp restrict_embedded_tag_all(query, _), do: query
783
784 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
785 raise_on_missing_preload()
786 end
787
788 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
789 from(
790 [_activity, object] in query,
791 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
792 )
793 end
794
795 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
796 restrict_embedded_tag_any(query, %{tag: [tag]})
797 end
798
799 defp restrict_embedded_tag_any(query, _), do: query
800
801 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
802 raise_on_missing_preload()
803 end
804
805 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
806 from(
807 [_activity, object] in query,
808 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
809 )
810 end
811
812 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
813 when is_binary(tag_reject) do
814 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
815 end
816
817 defp restrict_embedded_tag_reject_any(query, _), do: query
818
819 defp object_ids_query_for_tags(tags) do
820 from(hto in "hashtags_objects")
821 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
822 |> where([hto, ht], ht.name in ^tags)
823 |> select([hto], hto.object_id)
824 |> distinct([hto], true)
825 end
826
827 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
828 raise_on_missing_preload()
829 end
830
831 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
832 restrict_hashtag_any(query, %{tag: single_tag})
833 end
834
835 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
836 from(
837 [_activity, object] in query,
838 where:
839 fragment(
840 """
841 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
842 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
843 AND hashtags_objects.object_id = ?) @> ?
844 """,
845 ^tags,
846 object.id,
847 ^tags
848 )
849 )
850 end
851
852 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
853 restrict_hashtag_all(query, %{tag_all: [tag]})
854 end
855
856 defp restrict_hashtag_all(query, _), do: query
857
858 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
859 raise_on_missing_preload()
860 end
861
862 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
863 hashtag_ids =
864 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
865 |> Repo.all()
866
867 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
868 from(
869 [_activity, object] in query,
870 join: hto in "hashtags_objects",
871 on: hto.object_id == object.id,
872 where: hto.hashtag_id in ^hashtag_ids,
873 distinct: [desc: object.id],
874 order_by: [desc: object.id]
875 )
876 end
877
878 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
879 restrict_hashtag_any(query, %{tag: [tag]})
880 end
881
882 defp restrict_hashtag_any(query, _), do: query
883
884 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
885 raise_on_missing_preload()
886 end
887
888 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
889 from(
890 [_activity, object] in query,
891 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
892 )
893 end
894
895 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
896 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
897 end
898
899 defp restrict_hashtag_reject_any(query, _), do: query
900
901 defp raise_on_missing_preload do
902 raise "Can't use the child object without preloading!"
903 end
904
905 defp restrict_recipients(query, [], _user), do: query
906
907 defp restrict_recipients(query, recipients, nil) do
908 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
909 end
910
911 defp restrict_recipients(query, recipients, user) do
912 from(
913 activity in query,
914 where: fragment("? && ?", ^recipients, activity.recipients),
915 or_where: activity.actor == ^user.ap_id
916 )
917 end
918
919 defp restrict_local(query, %{local_only: true}) do
920 from(activity in query, where: activity.local == true)
921 end
922
923 defp restrict_local(query, _), do: query
924
925 defp restrict_remote(query, %{remote: true}) do
926 from(activity in query, where: activity.local == false)
927 end
928
929 defp restrict_remote(query, _), do: query
930
931 defp restrict_actor(query, %{actor_id: actor_id}) do
932 from(activity in query, where: activity.actor == ^actor_id)
933 end
934
935 defp restrict_actor(query, _), do: query
936
937 defp restrict_type(query, %{type: type}) when is_binary(type) do
938 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
939 end
940
941 defp restrict_type(query, %{type: type}) do
942 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
943 end
944
945 defp restrict_type(query, _), do: query
946
947 defp restrict_state(query, %{state: state}) do
948 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
949 end
950
951 defp restrict_state(query, _), do: query
952
953 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
954 from(
955 [_activity, object] in query,
956 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
957 )
958 end
959
960 defp restrict_favorited_by(query, _), do: query
961
962 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
963 raise "Can't use the child object without preloading!"
964 end
965
966 defp restrict_media(query, %{only_media: true}) do
967 from(
968 [activity, object] in query,
969 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
970 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
971 )
972 end
973
974 defp restrict_media(query, _), do: query
975
976 defp restrict_replies(query, %{exclude_replies: true}) do
977 from(
978 [_activity, object] in query,
979 where: fragment("?->>'inReplyTo' is null", object.data)
980 )
981 end
982
983 defp restrict_replies(query, %{
984 reply_filtering_user: %User{} = user,
985 reply_visibility: "self"
986 }) do
987 from(
988 [activity, object] in query,
989 where:
990 fragment(
991 "?->>'inReplyTo' is null OR ? = ANY(?)",
992 object.data,
993 ^user.ap_id,
994 activity.recipients
995 )
996 )
997 end
998
999 defp restrict_replies(query, %{
1000 reply_filtering_user: %User{} = user,
1001 reply_visibility: "following"
1002 }) do
1003 from(
1004 [activity, object] in query,
1005 where:
1006 fragment(
1007 """
1008 ?->>'type' != 'Create' -- This isn't a Create
1009 OR ?->>'inReplyTo' is null -- this isn't a reply
1010 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1011 -- unless they are the author (because authors
1012 -- are also part of the recipients). This leads
1013 -- to a bug that self-replies by friends won't
1014 -- show up.
1015 OR ? = ? -- The actor is us
1016 """,
1017 activity.data,
1018 object.data,
1019 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1020 activity.recipients,
1021 activity.actor,
1022 activity.actor,
1023 ^user.ap_id
1024 )
1025 )
1026 end
1027
1028 defp restrict_replies(query, _), do: query
1029
1030 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1031 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1032 end
1033
1034 defp restrict_reblogs(query, _), do: query
1035
1036 defp restrict_muted(query, %{with_muted: true}), do: query
1037
1038 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1039 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1040
1041 query =
1042 from([activity] in query,
1043 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1044 where:
1045 fragment(
1046 "not (?->'to' \\?| ?) or ? = ?",
1047 activity.data,
1048 ^mutes,
1049 activity.actor,
1050 ^user.ap_id
1051 )
1052 )
1053
1054 unless opts[:skip_preload] do
1055 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1056 else
1057 query
1058 end
1059 end
1060
1061 defp restrict_muted(query, _), do: query
1062
1063 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1064 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1065 domain_blocks = user.domain_blocks || []
1066
1067 following_ap_ids = User.get_friends_ap_ids(user)
1068
1069 query =
1070 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1071
1072 from(
1073 [activity, object: o] in query,
1074 # You don't block the author
1075 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1076
1077 # You don't block any recipients, and didn't author the post
1078 where:
1079 fragment(
1080 "((not (? && ?)) or ? = ?)",
1081 activity.recipients,
1082 ^blocked_ap_ids,
1083 activity.actor,
1084 ^user.ap_id
1085 ),
1086
1087 # You don't block the domain of any recipients, and didn't author the post
1088 where:
1089 fragment(
1090 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1091 activity.recipients,
1092 ^domain_blocks,
1093 activity.actor,
1094 ^user.ap_id
1095 ),
1096
1097 # It's not a boost of a user you block
1098 where:
1099 fragment(
1100 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1101 activity.data,
1102 activity.data,
1103 ^blocked_ap_ids
1104 ),
1105
1106 # You don't block the author's domain, and also don't follow the author
1107 where:
1108 fragment(
1109 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1110 activity.actor,
1111 ^domain_blocks,
1112 activity.actor,
1113 ^following_ap_ids
1114 ),
1115
1116 # Same as above, but checks the Object
1117 where:
1118 fragment(
1119 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1120 o.data,
1121 ^domain_blocks,
1122 o.data,
1123 ^following_ap_ids
1124 )
1125 )
1126 end
1127
1128 defp restrict_blocked(query, _), do: query
1129
1130 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1131 if Config.get([:activitypub, :blockers_visible]) == true do
1132 query
1133 else
1134 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1135
1136 from(
1137 activity in query,
1138 # The author doesn't block you
1139 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1140
1141 # It's not a boost of a user that blocks you
1142 where:
1143 fragment(
1144 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1145 activity.data,
1146 activity.data,
1147 ^blocker_ap_ids
1148 )
1149 )
1150 end
1151 end
1152
1153 defp restrict_blockers_visibility(query, _), do: query
1154
1155 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1156 from(
1157 activity in query,
1158 where:
1159 fragment(
1160 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1161 activity.data,
1162 ^[Constants.as_public()]
1163 )
1164 )
1165 end
1166
1167 defp restrict_unlisted(query, _), do: query
1168
1169 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1170 from(
1171 [activity, object: o] in query,
1172 where:
1173 fragment(
1174 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1175 activity.data,
1176 activity.data,
1177 activity.data,
1178 ^ids
1179 )
1180 )
1181 end
1182
1183 defp restrict_pinned(query, _), do: query
1184
1185 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1186 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1187
1188 from(
1189 activity in query,
1190 where:
1191 fragment(
1192 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1193 activity.data,
1194 activity.actor,
1195 ^muted_reblogs
1196 )
1197 )
1198 end
1199
1200 defp restrict_muted_reblogs(query, _), do: query
1201
1202 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1203 from(
1204 activity in query,
1205 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1206 )
1207 end
1208
1209 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1210 from(
1211 activity in query,
1212 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1213 )
1214 end
1215
1216 defp restrict_instance(query, _), do: query
1217
1218 defp restrict_filtered(query, %{user: %User{} = user}) do
1219 case Filter.compose_regex(user) do
1220 nil ->
1221 query
1222
1223 regex ->
1224 from([activity, object] in query,
1225 where:
1226 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1227 activity.actor == ^user.ap_id
1228 )
1229 end
1230 end
1231
1232 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1233 restrict_filtered(query, %{user: user})
1234 end
1235
1236 defp restrict_filtered(query, _), do: query
1237
1238 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1239
1240 defp exclude_poll_votes(query, _) do
1241 if has_named_binding?(query, :object) do
1242 from([activity, object: o] in query,
1243 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1244 )
1245 else
1246 query
1247 end
1248 end
1249
1250 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1251 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1252
1253 defp exclude_invisible_actors(query, _opts) do
1254 invisible_ap_ids =
1255 User.Query.build(%{invisible: true, select: [:ap_id]})
1256 |> Repo.all()
1257 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1258
1259 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1260 end
1261
1262 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1263 from(activity in query, where: activity.id != ^id)
1264 end
1265
1266 defp exclude_id(query, _), do: query
1267
1268 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1269
1270 defp maybe_preload_objects(query, _) do
1271 query
1272 |> Activity.with_preloaded_object()
1273 end
1274
1275 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1276
1277 defp maybe_preload_bookmarks(query, opts) do
1278 query
1279 |> Activity.with_preloaded_bookmark(opts[:user])
1280 end
1281
1282 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1283 query
1284 |> Activity.with_preloaded_report_notes()
1285 end
1286
1287 defp maybe_preload_report_notes(query, _), do: query
1288
1289 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1290
1291 defp maybe_set_thread_muted_field(query, opts) do
1292 query
1293 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1294 end
1295
1296 defp maybe_order(query, %{order: :desc}) do
1297 query
1298 |> order_by(desc: :id)
1299 end
1300
1301 defp maybe_order(query, %{order: :asc}) do
1302 query
1303 |> order_by(asc: :id)
1304 end
1305
1306 defp maybe_order(query, _), do: query
1307
1308 defp normalize_fetch_activities_query_opts(opts) do
1309 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1310 case opts[key] do
1311 value when is_bitstring(value) ->
1312 Map.put(opts, key, Hashtag.normalize_name(value))
1313
1314 value when is_list(value) ->
1315 normalized_value =
1316 value
1317 |> Enum.map(&Hashtag.normalize_name/1)
1318 |> Enum.uniq()
1319
1320 Map.put(opts, key, normalized_value)
1321
1322 _ ->
1323 opts
1324 end
1325 end)
1326 end
1327
1328 defp fetch_activities_query_ap_ids_ops(opts) do
1329 source_user = opts[:muting_user]
1330 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1331
1332 ap_id_relationships =
1333 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1334 [:block | ap_id_relationships]
1335 else
1336 ap_id_relationships
1337 end
1338
1339 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1340
1341 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1342 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1343
1344 restrict_muted_reblogs_opts =
1345 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1346
1347 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1348 end
1349
1350 def fetch_activities_query(recipients, opts \\ %{}) do
1351 opts = normalize_fetch_activities_query_opts(opts)
1352
1353 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1354 fetch_activities_query_ap_ids_ops(opts)
1355
1356 config = %{
1357 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1358 }
1359
1360 query =
1361 Activity
1362 |> maybe_preload_objects(opts)
1363 |> maybe_preload_bookmarks(opts)
1364 |> maybe_preload_report_notes(opts)
1365 |> maybe_set_thread_muted_field(opts)
1366 |> maybe_order(opts)
1367 |> restrict_recipients(recipients, opts[:user])
1368 |> restrict_replies(opts)
1369 |> restrict_since(opts)
1370 |> restrict_local(opts)
1371 |> restrict_remote(opts)
1372 |> restrict_actor(opts)
1373 |> restrict_type(opts)
1374 |> restrict_state(opts)
1375 |> restrict_favorited_by(opts)
1376 |> restrict_blocked(restrict_blocked_opts)
1377 |> restrict_blockers_visibility(opts)
1378 |> restrict_muted(restrict_muted_opts)
1379 |> restrict_filtered(opts)
1380 |> restrict_media(opts)
1381 |> restrict_visibility(opts)
1382 |> restrict_thread_visibility(opts, config)
1383 |> restrict_reblogs(opts)
1384 |> restrict_pinned(opts)
1385 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1386 |> restrict_instance(opts)
1387 |> restrict_announce_object_actor(opts)
1388 |> restrict_filtered(opts)
1389 |> maybe_restrict_deactivated_users(opts)
1390 |> exclude_poll_votes(opts)
1391 |> exclude_invisible_actors(opts)
1392 |> exclude_visibility(opts)
1393
1394 if Config.feature_enabled?(:improved_hashtag_timeline) do
1395 query
1396 |> restrict_hashtag_any(opts)
1397 |> restrict_hashtag_all(opts)
1398 |> restrict_hashtag_reject_any(opts)
1399 else
1400 query
1401 |> restrict_embedded_tag_any(opts)
1402 |> restrict_embedded_tag_all(opts)
1403 |> restrict_embedded_tag_reject_any(opts)
1404 end
1405 end
1406
1407 @doc """
1408 Fetch favorites activities of user with order by sort adds to favorites
1409 """
1410 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1411 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1412 user.ap_id
1413 |> Activity.Queries.by_actor()
1414 |> Activity.Queries.by_type("Like")
1415 |> Activity.with_joined_object()
1416 |> Object.with_joined_activity()
1417 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1418 |> order_by([like, _, _], desc_nulls_last: like.id)
1419 |> Pagination.fetch_paginated(
1420 Map.merge(params, %{skip_order: true}),
1421 pagination
1422 )
1423 end
1424
1425 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1426 Enum.map(activities, fn
1427 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1428 if Enum.any?(bcc, &(&1 in list_memberships)) do
1429 update_in(activity.data["cc"], &[user_ap_id | &1])
1430 else
1431 activity
1432 end
1433
1434 activity ->
1435 activity
1436 end)
1437 end
1438
1439 defp maybe_update_cc(activities, _, _), do: activities
1440
1441 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1442 from(activity in query,
1443 where:
1444 fragment("? && ?", activity.recipients, ^recipients) or
1445 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1446 ^Constants.as_public() in activity.recipients)
1447 )
1448 end
1449
1450 def fetch_activities_bounded(
1451 recipients,
1452 recipients_with_public,
1453 opts \\ %{},
1454 pagination \\ :keyset
1455 ) do
1456 fetch_activities_query([], opts)
1457 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1458 |> Pagination.fetch_paginated(opts, pagination)
1459 |> Enum.reverse()
1460 end
1461
1462 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1463 def upload(file, opts \\ []) do
1464 with {:ok, data} <- Upload.store(file, opts) do
1465 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1466
1467 Repo.insert(%Object{data: obj_data})
1468 end
1469 end
1470
1471 @spec get_actor_url(any()) :: binary() | nil
1472 defp get_actor_url(url) when is_binary(url), do: url
1473 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1474
1475 defp get_actor_url(url) when is_list(url) do
1476 url
1477 |> List.first()
1478 |> get_actor_url()
1479 end
1480
1481 defp get_actor_url(_url), do: nil
1482
1483 defp normalize_image(%{"url" => url}) do
1484 %{
1485 "type" => "Image",
1486 "url" => [%{"href" => url}]
1487 }
1488 end
1489
1490 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1491 defp normalize_image(_), do: nil
1492
1493 defp object_to_user_data(data, additional) do
1494 fields =
1495 data
1496 |> Map.get("attachment", [])
1497 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1498 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1499
1500 emojis =
1501 data
1502 |> Map.get("tag", [])
1503 |> Enum.filter(fn
1504 %{"type" => "Emoji"} -> true
1505 _ -> false
1506 end)
1507 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1508 {String.trim(name, ":"), url}
1509 end)
1510
1511 is_locked = data["manuallyApprovesFollowers"] || false
1512 data = Transmogrifier.maybe_fix_user_object(data)
1513 is_discoverable = data["discoverable"] || false
1514 invisible = data["invisible"] || false
1515 actor_type = data["type"] || "Person"
1516
1517 featured_address = data["featured"]
1518 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1519
1520 public_key =
1521 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1522 data["publicKey"]["publicKeyPem"]
1523 end
1524
1525 shared_inbox =
1526 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1527 data["endpoints"]["sharedInbox"]
1528 end
1529
1530 # if WebFinger request was already done, we probably have acct, otherwise
1531 # we request WebFinger here
1532 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1533
1534 # also_known_as must be a URL
1535 also_known_as =
1536 data
1537 |> Map.get("alsoKnownAs", [])
1538 |> Enum.filter(fn url ->
1539 case URI.parse(url) do
1540 %URI{scheme: "http"} -> true
1541 %URI{scheme: "https"} -> true
1542 _ -> false
1543 end
1544 end)
1545
1546 %{
1547 ap_id: data["id"],
1548 uri: get_actor_url(data["url"]),
1549 ap_enabled: true,
1550 banner: normalize_image(data["image"]),
1551 fields: fields,
1552 emoji: emojis,
1553 is_locked: is_locked,
1554 is_discoverable: is_discoverable,
1555 invisible: invisible,
1556 avatar: normalize_image(data["icon"]),
1557 name: data["name"],
1558 follower_address: data["followers"],
1559 following_address: data["following"],
1560 featured_address: featured_address,
1561 bio: data["summary"] || "",
1562 actor_type: actor_type,
1563 also_known_as: also_known_as,
1564 public_key: public_key,
1565 inbox: data["inbox"],
1566 shared_inbox: shared_inbox,
1567 pinned_objects: pinned_objects,
1568 nickname: nickname
1569 }
1570 end
1571
1572 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1573 generated = "#{username}@#{URI.parse(data["id"]).host}"
1574
1575 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1576 case WebFinger.finger(generated) do
1577 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1578 _ -> generated
1579 end
1580 else
1581 generated
1582 end
1583 end
1584
1585 # nickname can be nil because of virtual actors
1586 defp generate_nickname(_), do: nil
1587
1588 def fetch_follow_information_for_user(user) do
1589 with {:ok, following_data} <-
1590 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1591 {:ok, hide_follows} <- collection_private(following_data),
1592 {:ok, followers_data} <-
1593 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1594 {:ok, hide_followers} <- collection_private(followers_data) do
1595 {:ok,
1596 %{
1597 hide_follows: hide_follows,
1598 follower_count: normalize_counter(followers_data["totalItems"]),
1599 following_count: normalize_counter(following_data["totalItems"]),
1600 hide_followers: hide_followers
1601 }}
1602 else
1603 {:error, _} = e -> e
1604 e -> {:error, e}
1605 end
1606 end
1607
1608 defp normalize_counter(counter) when is_integer(counter), do: counter
1609 defp normalize_counter(_), do: 0
1610
1611 def maybe_update_follow_information(user_data) do
1612 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1613 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1614 {_, true} <-
1615 {:collections_available,
1616 !!(user_data[:following_address] && user_data[:follower_address])},
1617 {:ok, info} <-
1618 fetch_follow_information_for_user(user_data) do
1619 info = Map.merge(user_data[:info] || %{}, info)
1620
1621 user_data
1622 |> Map.put(:info, info)
1623 else
1624 {:user_type_check, false} ->
1625 user_data
1626
1627 {:collections_available, false} ->
1628 user_data
1629
1630 {:enabled, false} ->
1631 user_data
1632
1633 e ->
1634 Logger.error(
1635 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1636 )
1637
1638 user_data
1639 end
1640 end
1641
1642 defp collection_private(%{"first" => %{"type" => type}})
1643 when type in ["CollectionPage", "OrderedCollectionPage"],
1644 do: {:ok, false}
1645
1646 defp collection_private(%{"first" => first}) do
1647 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1648 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1649 {:ok, false}
1650 else
1651 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1652 {:error, _} = e -> e
1653 e -> {:error, e}
1654 end
1655 end
1656
1657 defp collection_private(_data), do: {:ok, true}
1658
1659 def user_data_from_user_object(data, additional \\ []) do
1660 with {:ok, data} <- MRF.filter(data) do
1661 {:ok, object_to_user_data(data, additional)}
1662 else
1663 e -> {:error, e}
1664 end
1665 end
1666
1667 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1668 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1669 {:ok, data} <- user_data_from_user_object(data, additional) do
1670 {:ok, maybe_update_follow_information(data)}
1671 else
1672 # If this has been deleted, only log a debug and not an error
1673 {:error, "Object has been deleted" = e} ->
1674 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1675 {:error, e}
1676
1677 {:error, {:reject, reason} = e} ->
1678 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1679 {:error, e}
1680
1681 {:error, e} ->
1682 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1683 {:error, e}
1684 end
1685 end
1686
1687 def maybe_handle_clashing_nickname(data) do
1688 with nickname when is_binary(nickname) <- data[:nickname],
1689 %User{} = old_user <- User.get_by_nickname(nickname),
1690 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1691 Logger.info(
1692 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1693 )
1694
1695 old_user
1696 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1697 |> User.update_and_set_cache()
1698 else
1699 {:ap_id_comparison, true} ->
1700 Logger.info(
1701 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1702 )
1703
1704 _ ->
1705 nil
1706 end
1707 end
1708
1709 def pin_data_from_featured_collection(%{
1710 "type" => "OrderedCollection",
1711 "first" => first
1712 }) do
1713 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1714 page
1715 |> Map.get("orderedItems")
1716 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1717 else
1718 e ->
1719 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1720 {:ok, %{}}
1721 end
1722 end
1723
1724 def pin_data_from_featured_collection(
1725 %{
1726 "type" => type
1727 } = collection
1728 )
1729 when type in ["OrderedCollection", "Collection"] do
1730 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1731
1732 # Items can either be a map _or_ a string
1733 objects
1734 |> Map.new(fn
1735 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1736 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1737 end)
1738 end
1739
1740 def fetch_and_prepare_featured_from_ap_id(nil) do
1741 {:ok, %{}}
1742 end
1743
1744 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1745 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1746 {:ok, pin_data_from_featured_collection(data)}
1747 else
1748 e ->
1749 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1750 {:ok, %{}}
1751 end
1752 end
1753
1754 def pinned_fetch_task(nil), do: nil
1755
1756 def pinned_fetch_task(%{pinned_objects: pins}) do
1757 if Enum.all?(pins, fn {ap_id, _} ->
1758 Object.get_cached_by_ap_id(ap_id) ||
1759 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1760 end) do
1761 :ok
1762 else
1763 :error
1764 end
1765 end
1766
1767 def make_user_from_ap_id(ap_id, additional \\ []) do
1768 user = User.get_cached_by_ap_id(ap_id)
1769
1770 if user && !User.ap_enabled?(user) do
1771 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1772 else
1773 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1774 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1775
1776 if user do
1777 user
1778 |> User.remote_user_changeset(data)
1779 |> User.update_and_set_cache()
1780 else
1781 maybe_handle_clashing_nickname(data)
1782
1783 data
1784 |> User.remote_user_changeset()
1785 |> Repo.insert()
1786 |> User.set_cache()
1787 end
1788 end
1789 end
1790 end
1791
1792 def make_user_from_nickname(nickname) do
1793 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1794 WebFinger.finger(nickname) do
1795 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1796 else
1797 _e -> {:error, "No AP id in WebFinger"}
1798 end
1799 end
1800
1801 # filter out broken threads
1802 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1803 entire_thread_visible_for_user?(activity, user)
1804 end
1805
1806 # do post-processing on a specific activity
1807 def contain_activity(%Activity{} = activity, %User{} = user) do
1808 contain_broken_threads(activity, user)
1809 end
1810
1811 def fetch_direct_messages_query do
1812 Activity
1813 |> restrict_type(%{type: "Create"})
1814 |> restrict_visibility(%{visibility: "direct"})
1815 |> order_by([activity], asc: activity.id)
1816 end
1817
1818 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1819
1820 defp maybe_restrict_deactivated_users(activity, _opts),
1821 do: Activity.restrict_deactivated_users(activity)
1822 end