a4f1c7041969510c30a2a2cecefb9ac6f98132ff
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @unpersisted_activity_types ~w[Undo Delete Remove Accept Reject]
109 @impl true
110 def persist(%{"type" => type} = object, [local: false] = meta)
111 when type in @unpersisted_activity_types do
112 {:ok, object, meta}
113 {recipients, _, _} = get_recipients(object)
114
115 unpersisted = %Activity{
116 data: object,
117 local: false,
118 recipients: recipients,
119 actor: object["actor"]
120 }
121
122 {:ok, unpersisted, meta}
123 end
124
125 @impl true
126 def persist(object, meta) do
127 with local <- Keyword.fetch!(meta, :local),
128 {recipients, _, _} <- get_recipients(object),
129 {:ok, activity} <-
130 Repo.insert(%Activity{
131 data: object,
132 local: local,
133 recipients: recipients,
134 actor: object["actor"]
135 }),
136 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
137 {:ok, _} <- maybe_create_activity_expiration(activity) do
138 {:ok, activity, meta}
139 end
140 end
141
142 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
143 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
144 with nil <- Activity.normalize(map),
145 map <- lazy_put_activity_defaults(map, fake),
146 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
147 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
148 {:ok, map} <- MRF.filter(map),
149 {recipients, _, _} = get_recipients(map),
150 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
151 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
152 {:ok, map, object} <- insert_full_object(map),
153 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
154 # Splice in the child object if we have one.
155 activity = Maps.put_if_present(activity, :object, object)
156
157 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
158 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
159 end)
160
161 # Add local posts to search index
162 if local, do: Pleroma.Search.add_to_index(activity)
163
164 {:ok, activity}
165 else
166 %Activity{} = activity ->
167 {:ok, activity}
168
169 {:actor_check, _} ->
170 {:error, false}
171
172 {:containment, _} = error ->
173 error
174
175 {:error, _} = error ->
176 error
177
178 {:fake, true, map, recipients} ->
179 activity = %Activity{
180 data: map,
181 local: local,
182 actor: map["actor"],
183 recipients: recipients,
184 id: "pleroma:fakeid"
185 }
186
187 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
188 {:ok, activity}
189
190 {:remote_limit_pass, _} ->
191 {:error, :remote_limit}
192
193 {:reject, _} = e ->
194 {:error, e}
195 end
196 end
197
198 defp insert_activity_with_expiration(data, local, recipients) do
199 struct = %Activity{
200 data: data,
201 local: local,
202 actor: data["actor"],
203 recipients: recipients
204 }
205
206 with {:ok, activity} <- Repo.insert(struct) do
207 maybe_create_activity_expiration(activity)
208 end
209 end
210
211 def notify_and_stream(activity) do
212 Notification.create_notifications(activity)
213
214 original_activity =
215 case activity do
216 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
217 Activity.get_create_by_object_ap_id_with_object(id)
218
219 _ ->
220 activity
221 end
222
223 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
224 participations = get_participations(conversation)
225 stream_out(activity)
226 stream_out_participations(participations)
227 end
228
229 defp maybe_create_activity_expiration(
230 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
231 ) do
232 with {:ok, _job} <-
233 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
234 activity_id: activity.id,
235 expires_at: expires_at
236 }) do
237 {:ok, activity}
238 end
239 end
240
241 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
242
243 defp create_or_bump_conversation(activity, actor) do
244 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
245 %User{} = user <- User.get_cached_by_ap_id(actor) do
246 Participation.mark_as_read(user, conversation)
247 {:ok, conversation}
248 end
249 end
250
251 defp get_participations({:ok, conversation}) do
252 conversation
253 |> Repo.preload(:participations, force: true)
254 |> Map.get(:participations)
255 end
256
257 defp get_participations(_), do: []
258
259 def stream_out_participations(participations) do
260 participations =
261 participations
262 |> Repo.preload(:user)
263
264 Streamer.stream("participation", participations)
265 end
266
267 @impl true
268 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
269 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
270 conversation = Repo.preload(conversation, :participations)
271
272 last_activity_id =
273 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
274 user: user,
275 blocking_user: user
276 })
277
278 if last_activity_id do
279 stream_out_participations(conversation.participations)
280 end
281 end
282 end
283
284 @impl true
285 def stream_out_participations(_, _), do: :noop
286
287 @impl true
288 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
289 when data_type in ["Create", "Announce", "Delete", "Update"] do
290 activity
291 |> Topics.get_activity_topics()
292 |> Streamer.stream(activity)
293 end
294
295 @impl true
296 def stream_out(_activity) do
297 :noop
298 end
299
300 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
301 def create(params, fake \\ false) do
302 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
303 result
304 end
305 end
306
307 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
308 additional = params[:additional] || %{}
309 # only accept false as false value
310 local = !(params[:local] == false)
311 published = params[:published]
312 quick_insert? = Config.get([:env]) == :benchmark
313
314 create_data =
315 make_create_data(
316 %{to: to, actor: actor, published: published, context: context, object: object},
317 additional
318 )
319
320 with {:ok, activity} <- insert(create_data, local, fake),
321 {:fake, false, activity} <- {:fake, fake, activity},
322 _ <- increase_replies_count_if_reply(create_data),
323 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
324 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
325 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_schedule_poll_notifications(activity),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity}
330 else
331 {:quick_insert, true, activity} ->
332 {:ok, activity}
333
334 {:fake, true, activity} ->
335 {:ok, activity}
336
337 {:error, message} ->
338 Repo.rollback(message)
339 end
340 end
341
342 defp maybe_schedule_poll_notifications(activity) do
343 PollWorker.schedule_poll_end(activity)
344 :ok
345 end
346
347 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | nil | {:error, any()}
349 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
352 result
353 end
354 end
355
356 defp do_unfollow(follower, followed, activity_id, local)
357
358 defp do_unfollow(follower, followed, activity_id, local) when local == true do
359 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 {:ok, activity} <- insert(unfollow_data, local),
362 {:ok, _activity} <- Repo.delete(follow_activity),
363 _ <- notify_and_stream(activity),
364 :ok <- maybe_federate(activity) do
365 {:ok, activity}
366 else
367 nil -> nil
368 {:error, error} -> Repo.rollback(error)
369 end
370 end
371
372 defp do_unfollow(follower, followed, activity_id, false) do
373 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
374 # uses deterministic ids for follows.
375 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
376 {:ok, _activity} <- Repo.delete(follow_activity),
377 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
378 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
379 _ <- notify_and_stream(unfollow_activity) do
380 {:ok, unfollow_activity}
381 else
382 nil -> nil
383 {:error, error} -> Repo.rollback(error)
384 end
385 end
386
387 defp make_unfollow_activity(data, local) do
388 {recipients, _, _} = get_recipients(data)
389
390 %Activity{
391 data: data,
392 local: local,
393 actor: data["actor"],
394 recipients: recipients
395 }
396 end
397
398 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
399 def flag(params) do
400 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
401 result
402 end
403 end
404
405 defp do_flag(
406 %{
407 actor: actor,
408 context: _context,
409 account: account,
410 statuses: statuses,
411 content: content
412 } = params
413 ) do
414 # only accept false as false value
415 local = !(params[:local] == false)
416 forward = !(params[:forward] == false)
417
418 additional = params[:additional] || %{}
419
420 additional =
421 if forward do
422 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
423 else
424 Map.merge(additional, %{"to" => [], "cc" => []})
425 end
426
427 with flag_data <- make_flag_data(params, additional),
428 {:ok, activity} <- insert(flag_data, local),
429 {:ok, stripped_activity} <- strip_report_status_data(activity),
430 _ <- notify_and_stream(activity),
431 :ok <-
432 maybe_federate(stripped_activity) do
433 User.all_superusers()
434 |> Enum.filter(fn user -> user.ap_id != actor end)
435 |> Enum.filter(fn user -> not is_nil(user.email) end)
436 |> Enum.each(fn superuser ->
437 superuser
438 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
439 |> Pleroma.Emails.Mailer.deliver_async()
440 end)
441
442 {:ok, activity}
443 else
444 {:error, error} -> Repo.rollback(error)
445 end
446 end
447
448 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
449 def move(%User{} = origin, %User{} = target, local \\ true) do
450 params = %{
451 "type" => "Move",
452 "actor" => origin.ap_id,
453 "object" => origin.ap_id,
454 "target" => target.ap_id,
455 "to" => [origin.follower_address]
456 }
457
458 with true <- origin.ap_id in target.also_known_as,
459 {:ok, activity} <- insert(params, local),
460 _ <- notify_and_stream(activity) do
461 maybe_federate(activity)
462
463 BackgroundWorker.enqueue("move_following", %{
464 "origin_id" => origin.id,
465 "target_id" => target.id
466 })
467
468 {:ok, activity}
469 else
470 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
471 err -> err
472 end
473 end
474
475 def fetch_activities_for_context_query(context, opts) do
476 public = [Constants.as_public()]
477
478 recipients =
479 if opts[:user],
480 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
481 else: public
482
483 from(activity in Activity)
484 |> maybe_preload_objects(opts)
485 |> maybe_preload_bookmarks(opts)
486 |> maybe_set_thread_muted_field(opts)
487 |> restrict_blocked(opts)
488 |> restrict_blockers_visibility(opts)
489 |> restrict_recipients(recipients, opts[:user])
490 |> restrict_filtered(opts)
491 |> where(
492 [activity],
493 fragment(
494 "?->>'type' = ? and ?->>'context' = ?",
495 activity.data,
496 "Create",
497 activity.data,
498 ^context
499 )
500 )
501 |> exclude_poll_votes(opts)
502 |> exclude_id(opts)
503 |> order_by([activity], desc: activity.id)
504 end
505
506 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
507 def fetch_activities_for_context(context, opts \\ %{}) do
508 context
509 |> fetch_activities_for_context_query(opts)
510 |> Repo.all()
511 end
512
513 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
514 FlakeId.Ecto.CompatType.t() | nil
515 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
516 context
517 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
518 |> restrict_visibility(%{visibility: "direct"})
519 |> limit(1)
520 |> select([a], a.id)
521 |> Repo.one()
522 end
523
524 defp fetch_paginated_optimized(query, opts, pagination) do
525 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
526 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
527 opts = Map.put(opts, :skip_extra_order, true)
528
529 Pagination.fetch_paginated(query, opts, pagination)
530 end
531
532 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
533 list_memberships = Pleroma.List.memberships(opts[:user])
534
535 fetch_activities_query(recipients ++ list_memberships, opts)
536 |> fetch_paginated_optimized(opts, pagination)
537 |> Enum.reverse()
538 |> maybe_update_cc(list_memberships, opts[:user])
539 end
540
541 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
543 includes_local_public = Map.get(opts, :includes_local_public, false)
544
545 opts = Map.delete(opts, :user)
546
547 intended_recipients =
548 if includes_local_public do
549 [Constants.as_public(), as_local_public()]
550 else
551 [Constants.as_public()]
552 end
553
554 intended_recipients
555 |> fetch_activities_query(opts)
556 |> restrict_unlisted(opts)
557 |> fetch_paginated_optimized(opts, pagination)
558 end
559
560 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
561 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
562 opts
563 |> Map.put(:restrict_unlisted, true)
564 |> fetch_public_or_unlisted_activities(pagination)
565 end
566
567 @valid_visibilities ~w[direct unlisted public private]
568
569 defp restrict_visibility(query, %{visibility: visibility})
570 when is_list(visibility) do
571 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
572 from(
573 a in query,
574 where:
575 fragment(
576 "activity_visibility(?, ?, ?) = ANY (?)",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583 else
584 Logger.error("Could not restrict visibility to #{visibility}")
585 end
586 end
587
588 defp restrict_visibility(query, %{visibility: visibility})
589 when visibility in @valid_visibilities do
590 from(
591 a in query,
592 where:
593 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
594 )
595 end
596
597 defp restrict_visibility(_query, %{visibility: visibility})
598 when visibility not in @valid_visibilities do
599 Logger.error("Could not restrict visibility to #{visibility}")
600 end
601
602 defp restrict_visibility(query, _visibility), do: query
603
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when is_list(visibility) do
606 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
607 from(
608 a in query,
609 where:
610 not fragment(
611 "activity_visibility(?, ?, ?) = ANY (?)",
612 a.actor,
613 a.recipients,
614 a.data,
615 ^visibility
616 )
617 )
618 else
619 Logger.error("Could not exclude visibility to #{visibility}")
620 query
621 end
622 end
623
624 defp exclude_visibility(query, %{exclude_visibilities: visibility})
625 when visibility in @valid_visibilities do
626 from(
627 a in query,
628 where:
629 not fragment(
630 "activity_visibility(?, ?, ?) = ?",
631 a.actor,
632 a.recipients,
633 a.data,
634 ^visibility
635 )
636 )
637 end
638
639 defp exclude_visibility(query, %{exclude_visibilities: visibility})
640 when visibility not in [nil | @valid_visibilities] do
641 Logger.error("Could not exclude visibility to #{visibility}")
642 query
643 end
644
645 defp exclude_visibility(query, _visibility), do: query
646
647 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
648 do: query
649
650 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
651 do: query
652
653 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
654 local_public = as_local_public()
655
656 from(
657 a in query,
658 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
659 )
660 end
661
662 defp restrict_thread_visibility(query, _, _), do: query
663
664 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
665 params =
666 params
667 |> Map.put(:user, reading_user)
668 |> Map.put(:actor_id, user.ap_id)
669
670 %{
671 godmode: params[:godmode],
672 reading_user: reading_user
673 }
674 |> user_activities_recipients()
675 |> fetch_activities(params)
676 |> Enum.reverse()
677 end
678
679 def fetch_user_activities(user, reading_user, params \\ %{})
680
681 def fetch_user_activities(user, reading_user, %{total: true} = params) do
682 result = fetch_activities_for_user(user, reading_user, params)
683
684 Keyword.put(result, :items, Enum.reverse(result[:items]))
685 end
686
687 def fetch_user_activities(user, reading_user, params) do
688 user
689 |> fetch_activities_for_user(reading_user, params)
690 |> Enum.reverse()
691 end
692
693 defp fetch_activities_for_user(user, reading_user, params) do
694 params =
695 params
696 |> Map.put(:type, ["Create", "Announce"])
697 |> Map.put(:user, reading_user)
698 |> Map.put(:actor_id, user.ap_id)
699 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
700
701 params =
702 if User.blocks?(reading_user, user) do
703 params
704 else
705 params
706 |> Map.put(:blocking_user, reading_user)
707 |> Map.put(:muting_user, reading_user)
708 end
709
710 pagination_type = Map.get(params, :pagination_type) || :keyset
711
712 %{
713 godmode: params[:godmode],
714 reading_user: reading_user
715 }
716 |> user_activities_recipients()
717 |> fetch_activities(params, pagination_type)
718 end
719
720 def fetch_statuses(reading_user, %{total: true} = params) do
721 result = fetch_activities_for_reading_user(reading_user, params)
722 Keyword.put(result, :items, Enum.reverse(result[:items]))
723 end
724
725 def fetch_statuses(reading_user, params) do
726 reading_user
727 |> fetch_activities_for_reading_user(params)
728 |> Enum.reverse()
729 end
730
731 defp fetch_activities_for_reading_user(reading_user, params) do
732 params = Map.put(params, :type, ["Create", "Announce"])
733
734 %{
735 godmode: params[:godmode],
736 reading_user: reading_user
737 }
738 |> user_activities_recipients()
739 |> fetch_activities(params, :offset)
740 end
741
742 defp user_activities_recipients(%{godmode: true}), do: []
743
744 defp user_activities_recipients(%{reading_user: reading_user}) do
745 if not is_nil(reading_user) and reading_user.local do
746 [
747 Constants.as_public(),
748 as_local_public(),
749 reading_user.ap_id | User.following(reading_user)
750 ]
751 else
752 [Constants.as_public()]
753 end
754 end
755
756 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
757 raise "Can't use the child object without preloading!"
758 end
759
760 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
761 from(
762 [activity, object] in query,
763 where:
764 fragment(
765 "?->>'type' != ? or ?->>'actor' != ?",
766 activity.data,
767 "Announce",
768 object.data,
769 ^actor
770 )
771 )
772 end
773
774 defp restrict_announce_object_actor(query, _), do: query
775
776 defp restrict_since(query, %{since_id: ""}), do: query
777
778 defp restrict_since(query, %{since_id: since_id}) do
779 from(activity in query, where: activity.id > ^since_id)
780 end
781
782 defp restrict_since(query, _), do: query
783
784 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
785 raise_on_missing_preload()
786 end
787
788 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
789 from(
790 [_activity, object] in query,
791 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
792 )
793 end
794
795 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
796 restrict_embedded_tag_any(query, %{tag: tag})
797 end
798
799 defp restrict_embedded_tag_all(query, _), do: query
800
801 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
802 raise_on_missing_preload()
803 end
804
805 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
806 from(
807 [_activity, object] in query,
808 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
809 )
810 end
811
812 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
813 restrict_embedded_tag_any(query, %{tag: [tag]})
814 end
815
816 defp restrict_embedded_tag_any(query, _), do: query
817
818 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
819 raise_on_missing_preload()
820 end
821
822 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
823 from(
824 [_activity, object] in query,
825 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
826 )
827 end
828
829 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
830 when is_binary(tag_reject) do
831 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
832 end
833
834 defp restrict_embedded_tag_reject_any(query, _), do: query
835
836 defp object_ids_query_for_tags(tags) do
837 from(hto in "hashtags_objects")
838 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
839 |> where([hto, ht], ht.name in ^tags)
840 |> select([hto], hto.object_id)
841 |> distinct([hto], true)
842 end
843
844 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
845 raise_on_missing_preload()
846 end
847
848 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
849 restrict_hashtag_any(query, %{tag: single_tag})
850 end
851
852 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
853 from(
854 [_activity, object] in query,
855 where:
856 fragment(
857 """
858 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
859 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
860 AND hashtags_objects.object_id = ?) @> ?
861 """,
862 ^tags,
863 object.id,
864 ^tags
865 )
866 )
867 end
868
869 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
870 restrict_hashtag_all(query, %{tag_all: [tag]})
871 end
872
873 defp restrict_hashtag_all(query, _), do: query
874
875 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
876 raise_on_missing_preload()
877 end
878
879 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
880 hashtag_ids =
881 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
882 |> Repo.all()
883
884 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
885 from(
886 [_activity, object] in query,
887 join: hto in "hashtags_objects",
888 on: hto.object_id == object.id,
889 where: hto.hashtag_id in ^hashtag_ids,
890 distinct: [desc: object.id],
891 order_by: [desc: object.id]
892 )
893 end
894
895 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
896 restrict_hashtag_any(query, %{tag: [tag]})
897 end
898
899 defp restrict_hashtag_any(query, _), do: query
900
901 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
902 raise_on_missing_preload()
903 end
904
905 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
906 from(
907 [_activity, object] in query,
908 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
909 )
910 end
911
912 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
913 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
914 end
915
916 defp restrict_hashtag_reject_any(query, _), do: query
917
918 defp raise_on_missing_preload do
919 raise "Can't use the child object without preloading!"
920 end
921
922 defp restrict_recipients(query, [], _user), do: query
923
924 defp restrict_recipients(query, recipients, nil) do
925 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
926 end
927
928 defp restrict_recipients(query, recipients, user) do
929 from(
930 activity in query,
931 where: fragment("? && ?", ^recipients, activity.recipients),
932 or_where: activity.actor == ^user.ap_id
933 )
934 end
935
936 defp restrict_local(query, %{local_only: true}) do
937 from(activity in query, where: activity.local == true)
938 end
939
940 defp restrict_local(query, _), do: query
941
942 defp restrict_remote(query, %{remote: true}) do
943 from(activity in query, where: activity.local == false)
944 end
945
946 defp restrict_remote(query, _), do: query
947
948 defp restrict_actor(query, %{actor_id: actor_id}) do
949 from(activity in query, where: activity.actor == ^actor_id)
950 end
951
952 defp restrict_actor(query, _), do: query
953
954 defp restrict_type(query, %{type: type}) when is_binary(type) do
955 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
956 end
957
958 defp restrict_type(query, %{type: type}) do
959 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
960 end
961
962 defp restrict_type(query, _), do: query
963
964 defp restrict_state(query, %{state: state}) do
965 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
966 end
967
968 defp restrict_state(query, _), do: query
969
970 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
971 from(
972 [_activity, object] in query,
973 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
974 )
975 end
976
977 defp restrict_favorited_by(query, _), do: query
978
979 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
980 raise "Can't use the child object without preloading!"
981 end
982
983 defp restrict_media(query, %{only_media: true}) do
984 from(
985 [activity, object] in query,
986 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
987 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
988 )
989 end
990
991 defp restrict_media(query, _), do: query
992
993 defp restrict_replies(query, %{exclude_replies: true}) do
994 from(
995 [_activity, object] in query,
996 where: fragment("?->>'inReplyTo' is null", object.data)
997 )
998 end
999
1000 defp restrict_replies(query, %{
1001 reply_filtering_user: %User{} = user,
1002 reply_visibility: "self"
1003 }) do
1004 from(
1005 [activity, object] in query,
1006 where:
1007 fragment(
1008 "?->>'inReplyTo' is null OR ? = ANY(?)",
1009 object.data,
1010 ^user.ap_id,
1011 activity.recipients
1012 )
1013 )
1014 end
1015
1016 defp restrict_replies(query, %{
1017 reply_filtering_user: %User{} = user,
1018 reply_visibility: "following"
1019 }) do
1020 from(
1021 [activity, object] in query,
1022 where:
1023 fragment(
1024 """
1025 ?->>'type' != 'Create' -- This isn't a Create
1026 OR ?->>'inReplyTo' is null -- this isn't a reply
1027 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1028 -- unless they are the author (because authors
1029 -- are also part of the recipients). This leads
1030 -- to a bug that self-replies by friends won't
1031 -- show up.
1032 OR ? = ? -- The actor is us
1033 """,
1034 activity.data,
1035 object.data,
1036 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1037 activity.recipients,
1038 activity.actor,
1039 activity.actor,
1040 ^user.ap_id
1041 )
1042 )
1043 end
1044
1045 defp restrict_replies(query, _), do: query
1046
1047 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1048 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1049 end
1050
1051 defp restrict_reblogs(query, _), do: query
1052
1053 defp restrict_muted(query, %{with_muted: true}), do: query
1054
1055 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1056 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1057
1058 query =
1059 from([activity] in query,
1060 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1061 where:
1062 fragment(
1063 "not (?->'to' \\?| ?) or ? = ?",
1064 activity.data,
1065 ^mutes,
1066 activity.actor,
1067 ^user.ap_id
1068 )
1069 )
1070
1071 unless opts[:skip_preload] do
1072 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1073 else
1074 query
1075 end
1076 end
1077
1078 defp restrict_muted(query, _), do: query
1079
1080 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1081 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1082 domain_blocks = user.domain_blocks || []
1083
1084 following_ap_ids = User.get_friends_ap_ids(user)
1085
1086 query =
1087 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1088
1089 from(
1090 [activity, object: o] in query,
1091 # You don't block the author
1092 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1093
1094 # You don't block any recipients, and didn't author the post
1095 where:
1096 fragment(
1097 "((not (? && ?)) or ? = ?)",
1098 activity.recipients,
1099 ^blocked_ap_ids,
1100 activity.actor,
1101 ^user.ap_id
1102 ),
1103
1104 # You don't block the domain of any recipients, and didn't author the post
1105 where:
1106 fragment(
1107 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1108 activity.recipients,
1109 ^domain_blocks,
1110 activity.actor,
1111 ^user.ap_id
1112 ),
1113
1114 # It's not a boost of a user you block
1115 where:
1116 fragment(
1117 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1118 activity.data,
1119 activity.data,
1120 ^blocked_ap_ids
1121 ),
1122
1123 # You don't block the author's domain, and also don't follow the author
1124 where:
1125 fragment(
1126 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1127 activity.actor,
1128 ^domain_blocks,
1129 activity.actor,
1130 ^following_ap_ids
1131 ),
1132
1133 # Same as above, but checks the Object
1134 where:
1135 fragment(
1136 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1137 o.data,
1138 ^domain_blocks,
1139 o.data,
1140 ^following_ap_ids
1141 )
1142 )
1143 end
1144
1145 defp restrict_blocked(query, _), do: query
1146
1147 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1148 if Config.get([:activitypub, :blockers_visible]) == true do
1149 query
1150 else
1151 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1152
1153 from(
1154 activity in query,
1155 # The author doesn't block you
1156 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1157
1158 # It's not a boost of a user that blocks you
1159 where:
1160 fragment(
1161 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1162 activity.data,
1163 activity.data,
1164 ^blocker_ap_ids
1165 )
1166 )
1167 end
1168 end
1169
1170 defp restrict_blockers_visibility(query, _), do: query
1171
1172 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1173 from(
1174 activity in query,
1175 where:
1176 fragment(
1177 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1178 activity.data,
1179 ^[Constants.as_public()]
1180 )
1181 )
1182 end
1183
1184 defp restrict_unlisted(query, _), do: query
1185
1186 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1187 from(
1188 [activity, object: o] in query,
1189 where:
1190 fragment(
1191 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1192 activity.data,
1193 activity.data,
1194 activity.data,
1195 ^ids
1196 )
1197 )
1198 end
1199
1200 defp restrict_pinned(query, _), do: query
1201
1202 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1203 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1204
1205 from(
1206 activity in query,
1207 where:
1208 fragment(
1209 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1210 activity.data,
1211 activity.actor,
1212 ^muted_reblogs
1213 )
1214 )
1215 end
1216
1217 defp restrict_muted_reblogs(query, _), do: query
1218
1219 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1220 from(
1221 activity in query,
1222 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1223 )
1224 end
1225
1226 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1227 from(
1228 activity in query,
1229 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1230 )
1231 end
1232
1233 defp restrict_instance(query, _), do: query
1234
1235 defp restrict_filtered(query, %{user: %User{} = user}) do
1236 case Filter.compose_regex(user) do
1237 nil ->
1238 query
1239
1240 regex ->
1241 from([activity, object] in query,
1242 where:
1243 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1244 activity.actor == ^user.ap_id
1245 )
1246 end
1247 end
1248
1249 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1250 restrict_filtered(query, %{user: user})
1251 end
1252
1253 defp restrict_filtered(query, _), do: query
1254
1255 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1256
1257 defp exclude_poll_votes(query, _) do
1258 if has_named_binding?(query, :object) do
1259 from([activity, object: o] in query,
1260 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1261 )
1262 else
1263 query
1264 end
1265 end
1266
1267 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1268 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1269
1270 defp exclude_invisible_actors(query, _opts) do
1271 query
1272 |> join(:inner, [activity], u in User,
1273 as: :u,
1274 on: activity.actor == u.ap_id and u.invisible == false
1275 )
1276 end
1277
1278 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1279 from(activity in query, where: activity.id != ^id)
1280 end
1281
1282 defp exclude_id(query, _), do: query
1283
1284 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1285
1286 defp maybe_preload_objects(query, _) do
1287 query
1288 |> Activity.with_preloaded_object()
1289 end
1290
1291 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1292
1293 defp maybe_preload_bookmarks(query, opts) do
1294 query
1295 |> Activity.with_preloaded_bookmark(opts[:user])
1296 end
1297
1298 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1299 query
1300 |> Activity.with_preloaded_report_notes()
1301 end
1302
1303 defp maybe_preload_report_notes(query, _), do: query
1304
1305 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1306
1307 defp maybe_set_thread_muted_field(query, opts) do
1308 query
1309 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1310 end
1311
1312 defp maybe_order(query, %{order: :desc}) do
1313 query
1314 |> order_by(desc: :id)
1315 end
1316
1317 defp maybe_order(query, %{order: :asc}) do
1318 query
1319 |> order_by(asc: :id)
1320 end
1321
1322 defp maybe_order(query, _), do: query
1323
1324 defp normalize_fetch_activities_query_opts(opts) do
1325 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1326 case opts[key] do
1327 value when is_bitstring(value) ->
1328 Map.put(opts, key, Hashtag.normalize_name(value))
1329
1330 value when is_list(value) ->
1331 normalized_value =
1332 value
1333 |> Enum.map(&Hashtag.normalize_name/1)
1334 |> Enum.uniq()
1335
1336 Map.put(opts, key, normalized_value)
1337
1338 _ ->
1339 opts
1340 end
1341 end)
1342 end
1343
1344 defp fetch_activities_query_ap_ids_ops(opts) do
1345 source_user = opts[:muting_user]
1346 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1347
1348 ap_id_relationships =
1349 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1350 [:block | ap_id_relationships]
1351 else
1352 ap_id_relationships
1353 end
1354
1355 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1356
1357 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1358 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1359
1360 restrict_muted_reblogs_opts =
1361 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1362
1363 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1364 end
1365
1366 def fetch_activities_query(recipients, opts \\ %{}) do
1367 opts = normalize_fetch_activities_query_opts(opts)
1368
1369 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1370 fetch_activities_query_ap_ids_ops(opts)
1371
1372 config = %{
1373 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1374 }
1375
1376 query =
1377 Activity
1378 |> maybe_preload_objects(opts)
1379 |> maybe_preload_bookmarks(opts)
1380 |> maybe_preload_report_notes(opts)
1381 |> maybe_set_thread_muted_field(opts)
1382 |> maybe_order(opts)
1383 |> restrict_recipients(recipients, opts[:user])
1384 |> restrict_replies(opts)
1385 |> restrict_since(opts)
1386 |> restrict_local(opts)
1387 |> restrict_remote(opts)
1388 |> restrict_actor(opts)
1389 |> restrict_type(opts)
1390 |> restrict_state(opts)
1391 |> restrict_favorited_by(opts)
1392 |> restrict_blocked(restrict_blocked_opts)
1393 |> restrict_blockers_visibility(opts)
1394 |> restrict_muted(restrict_muted_opts)
1395 |> restrict_filtered(opts)
1396 |> restrict_media(opts)
1397 |> restrict_visibility(opts)
1398 |> restrict_thread_visibility(opts, config)
1399 |> restrict_reblogs(opts)
1400 |> restrict_pinned(opts)
1401 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1402 |> restrict_instance(opts)
1403 |> restrict_announce_object_actor(opts)
1404 |> restrict_filtered(opts)
1405 |> maybe_restrict_deactivated_users(opts)
1406 |> exclude_poll_votes(opts)
1407 |> exclude_invisible_actors(opts)
1408 |> exclude_visibility(opts)
1409
1410 if Config.feature_enabled?(:improved_hashtag_timeline) do
1411 query
1412 |> restrict_hashtag_any(opts)
1413 |> restrict_hashtag_all(opts)
1414 |> restrict_hashtag_reject_any(opts)
1415 else
1416 query
1417 |> restrict_embedded_tag_any(opts)
1418 |> restrict_embedded_tag_all(opts)
1419 |> restrict_embedded_tag_reject_any(opts)
1420 end
1421 end
1422
1423 @doc """
1424 Fetch favorites activities of user with order by sort adds to favorites
1425 """
1426 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1427 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1428 user.ap_id
1429 |> Activity.Queries.by_actor()
1430 |> Activity.Queries.by_type("Like")
1431 |> Activity.with_joined_object()
1432 |> Object.with_joined_activity()
1433 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1434 |> order_by([like, _, _], desc_nulls_last: like.id)
1435 |> Pagination.fetch_paginated(
1436 Map.merge(params, %{skip_order: true}),
1437 pagination
1438 )
1439 end
1440
1441 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1442 Enum.map(activities, fn
1443 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1444 if Enum.any?(bcc, &(&1 in list_memberships)) do
1445 update_in(activity.data["cc"], &[user_ap_id | &1])
1446 else
1447 activity
1448 end
1449
1450 activity ->
1451 activity
1452 end)
1453 end
1454
1455 defp maybe_update_cc(activities, _, _), do: activities
1456
1457 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1458 from(activity in query,
1459 where:
1460 fragment("? && ?", activity.recipients, ^recipients) or
1461 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1462 ^Constants.as_public() in activity.recipients)
1463 )
1464 end
1465
1466 def fetch_activities_bounded(
1467 recipients,
1468 recipients_with_public,
1469 opts \\ %{},
1470 pagination \\ :keyset
1471 ) do
1472 fetch_activities_query([], opts)
1473 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1474 |> Pagination.fetch_paginated(opts, pagination)
1475 |> Enum.reverse()
1476 end
1477
1478 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1479 def upload(file, opts \\ []) do
1480 with {:ok, data} <- Upload.store(file, opts) do
1481 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1482
1483 Repo.insert(%Object{data: obj_data})
1484 end
1485 end
1486
1487 @spec get_actor_url(any()) :: binary() | nil
1488 defp get_actor_url(url) when is_binary(url), do: url
1489 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1490
1491 defp get_actor_url(url) when is_list(url) do
1492 url
1493 |> List.first()
1494 |> get_actor_url()
1495 end
1496
1497 defp get_actor_url(_url), do: nil
1498
1499 defp normalize_image(%{"url" => url}) do
1500 %{
1501 "type" => "Image",
1502 "url" => [%{"href" => url}]
1503 }
1504 end
1505
1506 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1507 defp normalize_image(_), do: nil
1508
1509 defp object_to_user_data(data, additional) do
1510 fields =
1511 data
1512 |> Map.get("attachment", [])
1513 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1514 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1515
1516 emojis =
1517 data
1518 |> Map.get("tag", [])
1519 |> Enum.filter(fn
1520 %{"type" => "Emoji"} -> true
1521 _ -> false
1522 end)
1523 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1524 {String.trim(name, ":"), url}
1525 end)
1526
1527 is_locked = data["manuallyApprovesFollowers"] || false
1528 data = Transmogrifier.maybe_fix_user_object(data)
1529 is_discoverable = data["discoverable"] || false
1530 invisible = data["invisible"] || false
1531 actor_type = data["type"] || "Person"
1532
1533 featured_address = data["featured"]
1534 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1535
1536 public_key =
1537 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1538 data["publicKey"]["publicKeyPem"]
1539 end
1540
1541 shared_inbox =
1542 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1543 data["endpoints"]["sharedInbox"]
1544 end
1545
1546 # if WebFinger request was already done, we probably have acct, otherwise
1547 # we request WebFinger here
1548 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1549
1550 # also_known_as must be a URL
1551 also_known_as =
1552 data
1553 |> Map.get("alsoKnownAs", [])
1554 |> Enum.filter(fn url ->
1555 case URI.parse(url) do
1556 %URI{scheme: "http"} -> true
1557 %URI{scheme: "https"} -> true
1558 _ -> false
1559 end
1560 end)
1561
1562 %{
1563 ap_id: data["id"],
1564 uri: get_actor_url(data["url"]),
1565 ap_enabled: true,
1566 banner: normalize_image(data["image"]),
1567 fields: fields,
1568 emoji: emojis,
1569 is_locked: is_locked,
1570 is_discoverable: is_discoverable,
1571 invisible: invisible,
1572 avatar: normalize_image(data["icon"]),
1573 name: data["name"],
1574 follower_address: data["followers"],
1575 following_address: data["following"],
1576 featured_address: featured_address,
1577 bio: data["summary"] || "",
1578 actor_type: actor_type,
1579 also_known_as: also_known_as,
1580 public_key: public_key,
1581 inbox: data["inbox"],
1582 shared_inbox: shared_inbox,
1583 pinned_objects: pinned_objects,
1584 nickname: nickname
1585 }
1586 end
1587
1588 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1589 generated = "#{username}@#{URI.parse(data["id"]).host}"
1590
1591 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1592 case WebFinger.finger(generated) do
1593 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1594 _ -> generated
1595 end
1596 else
1597 generated
1598 end
1599 end
1600
1601 # nickname can be nil because of virtual actors
1602 defp generate_nickname(_), do: nil
1603
1604 def fetch_follow_information_for_user(user) do
1605 with {:ok, following_data} <-
1606 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1607 {:ok, hide_follows} <- collection_private(following_data),
1608 {:ok, followers_data} <-
1609 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1610 {:ok, hide_followers} <- collection_private(followers_data) do
1611 {:ok,
1612 %{
1613 hide_follows: hide_follows,
1614 follower_count: normalize_counter(followers_data["totalItems"]),
1615 following_count: normalize_counter(following_data["totalItems"]),
1616 hide_followers: hide_followers
1617 }}
1618 else
1619 {:error, _} = e -> e
1620 e -> {:error, e}
1621 end
1622 end
1623
1624 defp normalize_counter(counter) when is_integer(counter), do: counter
1625 defp normalize_counter(_), do: 0
1626
1627 def maybe_update_follow_information(user_data) do
1628 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1629 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1630 {_, true} <-
1631 {:collections_available,
1632 !!(user_data[:following_address] && user_data[:follower_address])},
1633 {:ok, info} <-
1634 fetch_follow_information_for_user(user_data) do
1635 info = Map.merge(user_data[:info] || %{}, info)
1636
1637 user_data
1638 |> Map.put(:info, info)
1639 else
1640 {:user_type_check, false} ->
1641 user_data
1642
1643 {:collections_available, false} ->
1644 user_data
1645
1646 {:enabled, false} ->
1647 user_data
1648
1649 e ->
1650 Logger.error(
1651 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1652 )
1653
1654 user_data
1655 end
1656 end
1657
1658 defp collection_private(%{"first" => %{"type" => type}})
1659 when type in ["CollectionPage", "OrderedCollectionPage"],
1660 do: {:ok, false}
1661
1662 defp collection_private(%{"first" => first}) do
1663 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1664 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1665 {:ok, false}
1666 else
1667 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1668 {:error, _} = e -> e
1669 e -> {:error, e}
1670 end
1671 end
1672
1673 defp collection_private(_data), do: {:ok, true}
1674
1675 def user_data_from_user_object(data, additional \\ []) do
1676 with {:ok, data} <- MRF.filter(data) do
1677 {:ok, object_to_user_data(data, additional)}
1678 else
1679 e -> {:error, e}
1680 end
1681 end
1682
1683 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1684 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1685 {:ok, data} <- user_data_from_user_object(data, additional) do
1686 {:ok, maybe_update_follow_information(data)}
1687 else
1688 # If this has been deleted, only log a debug and not an error
1689 {:error, "Object has been deleted" = e} ->
1690 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1691 {:error, e}
1692
1693 {:error, {:reject, reason} = e} ->
1694 Logger.debug("Rejected user #{ap_id}: #{inspect(reason)}")
1695 {:error, e}
1696
1697 {:error, e} ->
1698 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1699 {:error, e}
1700 end
1701 end
1702
1703 def maybe_handle_clashing_nickname(data) do
1704 with nickname when is_binary(nickname) <- data[:nickname],
1705 %User{} = old_user <- User.get_by_nickname(nickname),
1706 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1707 Logger.info(
1708 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1709 )
1710
1711 old_user
1712 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1713 |> User.update_and_set_cache()
1714 else
1715 {:ap_id_comparison, true} ->
1716 Logger.info(
1717 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1718 )
1719
1720 _ ->
1721 nil
1722 end
1723 end
1724
1725 def pin_data_from_featured_collection(%{
1726 "type" => "OrderedCollection",
1727 "first" => first
1728 }) do
1729 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1730 page
1731 |> Map.get("orderedItems")
1732 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1733 else
1734 e ->
1735 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1736 {:ok, %{}}
1737 end
1738 end
1739
1740 def pin_data_from_featured_collection(
1741 %{
1742 "type" => type
1743 } = collection
1744 )
1745 when type in ["OrderedCollection", "Collection"] do
1746 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1747
1748 # Items can either be a map _or_ a string
1749 objects
1750 |> Map.new(fn
1751 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1752 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1753 end)
1754 end
1755
1756 def fetch_and_prepare_featured_from_ap_id(nil) do
1757 {:ok, %{}}
1758 end
1759
1760 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1761 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1762 {:ok, pin_data_from_featured_collection(data)}
1763 else
1764 e ->
1765 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1766 {:ok, %{}}
1767 end
1768 end
1769
1770 def pinned_fetch_task(nil), do: nil
1771
1772 def pinned_fetch_task(%{pinned_objects: pins}) do
1773 if Enum.all?(pins, fn {ap_id, _} ->
1774 Object.get_cached_by_ap_id(ap_id) ||
1775 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1776 end) do
1777 :ok
1778 else
1779 :error
1780 end
1781 end
1782
1783 def make_user_from_ap_id(ap_id, additional \\ []) do
1784 user = User.get_cached_by_ap_id(ap_id)
1785
1786 if user && !User.ap_enabled?(user) do
1787 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1788 else
1789 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1790 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1791
1792 if user do
1793 user
1794 |> User.remote_user_changeset(data)
1795 |> User.update_and_set_cache()
1796 else
1797 maybe_handle_clashing_nickname(data)
1798
1799 data
1800 |> User.remote_user_changeset()
1801 |> Repo.insert()
1802 |> User.set_cache()
1803 end
1804 end
1805 end
1806 end
1807
1808 def make_user_from_nickname(nickname) do
1809 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1810 WebFinger.finger(nickname) do
1811 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1812 else
1813 _e -> {:error, "No AP id in WebFinger"}
1814 end
1815 end
1816
1817 # filter out broken threads
1818 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1819 entire_thread_visible_for_user?(activity, user)
1820 end
1821
1822 # do post-processing on a specific activity
1823 def contain_activity(%Activity{} = activity, %User{} = user) do
1824 contain_broken_threads(activity, user)
1825 end
1826
1827 def fetch_direct_messages_query do
1828 Activity
1829 |> restrict_type(%{type: "Create"})
1830 |> restrict_visibility(%{visibility: "direct"})
1831 |> order_by([activity], asc: activity.id)
1832 end
1833
1834 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1835
1836 defp maybe_restrict_deactivated_users(activity, _opts),
1837 do: Activity.restrict_deactivated_users(activity)
1838 end