d700128c07a7067c245d6c9a208188b4c2b42c20
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @unpersisted_activity_types ~w[Undo Delete Remove Accept Reject]
109 @impl true
110 def persist(%{"type" => type} = object, [local: false] = meta)
111 when type in @unpersisted_activity_types do
112 {:ok, object, meta}
113 {recipients, _, _} = get_recipients(object)
114
115 unpersisted = %Activity{
116 data: object,
117 local: false,
118 recipients: recipients,
119 actor: object["actor"]
120 }
121
122 {:ok, unpersisted, meta}
123 end
124
125 @impl true
126 def persist(object, meta) do
127 with local <- Keyword.fetch!(meta, :local),
128 {recipients, _, _} <- get_recipients(object),
129 {:ok, activity} <-
130 Repo.insert(%Activity{
131 data: object,
132 local: local,
133 recipients: recipients,
134 actor: object["actor"]
135 }),
136 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
137 {:ok, _} <- maybe_create_activity_expiration(activity) do
138 {:ok, activity, meta}
139 end
140 end
141
142 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
143 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
144 with nil <- Activity.normalize(map),
145 map <- lazy_put_activity_defaults(map, fake),
146 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
147 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
148 {:ok, map} <- MRF.filter(map),
149 {recipients, _, _} = get_recipients(map),
150 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
151 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
152 {:ok, map, object} <- insert_full_object(map),
153 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
154 # Splice in the child object if we have one.
155 activity = Maps.put_if_present(activity, :object, object)
156
157 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
158 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
159 end)
160
161 # Add local posts to search index
162 if local, do: Pleroma.Search.add_to_index(activity)
163
164 {:ok, activity}
165 else
166 %Activity{} = activity ->
167 {:ok, activity}
168
169 {:actor_check, _} ->
170 {:error, false}
171
172 {:containment, _} = error ->
173 error
174
175 {:error, _} = error ->
176 error
177
178 {:fake, true, map, recipients} ->
179 activity = %Activity{
180 data: map,
181 local: local,
182 actor: map["actor"],
183 recipients: recipients,
184 id: "pleroma:fakeid"
185 }
186
187 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
188 {:ok, activity}
189
190 {:remote_limit_pass, _} ->
191 {:error, :remote_limit}
192
193 {:reject, _} = e ->
194 {:error, e}
195 end
196 end
197
198 defp insert_activity_with_expiration(data, local, recipients) do
199 struct = %Activity{
200 data: data,
201 local: local,
202 actor: data["actor"],
203 recipients: recipients
204 }
205
206 with {:ok, activity} <- Repo.insert(struct) do
207 maybe_create_activity_expiration(activity)
208 end
209 end
210
211 def notify_and_stream(activity) do
212 Notification.create_notifications(activity)
213
214 original_activity =
215 case activity do
216 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
217 Activity.get_create_by_object_ap_id_with_object(id)
218
219 _ ->
220 activity
221 end
222
223 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
224 participations = get_participations(conversation)
225 stream_out(activity)
226 stream_out_participations(participations)
227 end
228
229 defp maybe_create_activity_expiration(
230 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
231 ) do
232 with {:ok, _job} <-
233 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
234 activity_id: activity.id,
235 expires_at: expires_at
236 }) do
237 {:ok, activity}
238 end
239 end
240
241 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
242
243 defp create_or_bump_conversation(activity, actor) do
244 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
245 %User{} = user <- User.get_cached_by_ap_id(actor) do
246 Participation.mark_as_read(user, conversation)
247 {:ok, conversation}
248 end
249 end
250
251 defp get_participations({:ok, conversation}) do
252 conversation
253 |> Repo.preload(:participations, force: true)
254 |> Map.get(:participations)
255 end
256
257 defp get_participations(_), do: []
258
259 def stream_out_participations(participations) do
260 participations =
261 participations
262 |> Repo.preload(:user)
263
264 Streamer.stream("participation", participations)
265 end
266
267 @impl true
268 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
269 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
270 conversation = Repo.preload(conversation, :participations)
271
272 last_activity_id =
273 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
274 user: user,
275 blocking_user: user
276 })
277
278 if last_activity_id do
279 stream_out_participations(conversation.participations)
280 end
281 end
282 end
283
284 @impl true
285 def stream_out_participations(_, _), do: :noop
286
287 @impl true
288 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
289 when data_type in ["Create", "Announce", "Delete", "Update"] do
290 activity
291 |> Topics.get_activity_topics()
292 |> Streamer.stream(activity)
293 end
294
295 @impl true
296 def stream_out(_activity) do
297 :noop
298 end
299
300 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
301 def create(params, fake \\ false) do
302 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
303 result
304 end
305 end
306
307 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
308 additional = params[:additional] || %{}
309 # only accept false as false value
310 local = !(params[:local] == false)
311 published = params[:published]
312 quick_insert? = Config.get([:env]) == :benchmark
313
314 create_data =
315 make_create_data(
316 %{to: to, actor: actor, published: published, context: context, object: object},
317 additional
318 )
319
320 with {:ok, activity} <- insert(create_data, local, fake),
321 {:fake, false, activity} <- {:fake, fake, activity},
322 _ <- increase_replies_count_if_reply(create_data),
323 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
324 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
325 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_schedule_poll_notifications(activity),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity}
330 else
331 {:quick_insert, true, activity} ->
332 {:ok, activity}
333
334 {:fake, true, activity} ->
335 {:ok, activity}
336
337 {:error, message} ->
338 Repo.rollback(message)
339 end
340 end
341
342 defp maybe_schedule_poll_notifications(activity) do
343 PollWorker.schedule_poll_end(activity)
344 :ok
345 end
346
347 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | nil | {:error, any()}
349 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
352 result
353 end
354 end
355
356 defp do_unfollow(follower, followed, activity_id, local)
357
358 defp do_unfollow(follower, followed, activity_id, local) when local == true do
359 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
360 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
361 {:ok, activity} <- insert(unfollow_data, local),
362 {:ok, _activity} <- Repo.delete(follow_activity),
363 _ <- notify_and_stream(activity),
364 :ok <- maybe_federate(activity) do
365 {:ok, activity}
366 else
367 nil -> nil
368 {:error, error} -> Repo.rollback(error)
369 end
370 end
371
372 defp do_unfollow(follower, followed, activity_id, false) do
373 # On a remote unfollow, _remove_ their activity from the database, since some software (MISSKEEEEY)
374 # uses deterministic ids for follows.
375 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
376 {:ok, _activity} <- Repo.delete(follow_activity),
377 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
378 unfollow_activity <- make_unfollow_activity(unfollow_data, false),
379 _ <- notify_and_stream(unfollow_activity) do
380 {:ok, unfollow_activity}
381 else
382 nil -> nil
383 {:error, error} -> Repo.rollback(error)
384 end
385 end
386
387 defp make_unfollow_activity(data, local) do
388 {recipients, _, _} = get_recipients(data)
389
390 %Activity{
391 data: data,
392 local: local,
393 actor: data["actor"],
394 recipients: recipients
395 }
396 end
397
398 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
399 def flag(params) do
400 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
401 result
402 end
403 end
404
405 defp do_flag(
406 %{
407 actor: actor,
408 context: _context,
409 account: account,
410 statuses: statuses,
411 content: content
412 } = params
413 ) do
414 # only accept false as false value
415 local = !(params[:local] == false)
416 forward = !(params[:forward] == false)
417
418 additional = params[:additional] || %{}
419
420 additional =
421 if forward do
422 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
423 else
424 Map.merge(additional, %{"to" => [], "cc" => []})
425 end
426
427 with flag_data <- make_flag_data(params, additional),
428 {:ok, activity} <- insert(flag_data, local),
429 {:ok, stripped_activity} <- strip_report_status_data(activity),
430 _ <- notify_and_stream(activity),
431 :ok <-
432 maybe_federate(stripped_activity) do
433 User.all_superusers()
434 |> Enum.filter(fn user -> user.ap_id != actor end)
435 |> Enum.filter(fn user -> not is_nil(user.email) end)
436 |> Enum.each(fn superuser ->
437 superuser
438 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
439 |> Pleroma.Emails.Mailer.deliver_async()
440 end)
441
442 {:ok, activity}
443 else
444 {:error, error} -> Repo.rollback(error)
445 end
446 end
447
448 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
449 def move(%User{} = origin, %User{} = target, local \\ true) do
450 params = %{
451 "type" => "Move",
452 "actor" => origin.ap_id,
453 "object" => origin.ap_id,
454 "target" => target.ap_id,
455 "to" => [origin.follower_address]
456 }
457
458 with true <- origin.ap_id in target.also_known_as,
459 {:ok, activity} <- insert(params, local),
460 _ <- notify_and_stream(activity) do
461 maybe_federate(activity)
462
463 BackgroundWorker.enqueue("move_following", %{
464 "origin_id" => origin.id,
465 "target_id" => target.id
466 })
467
468 {:ok, activity}
469 else
470 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
471 err -> err
472 end
473 end
474
475 def fetch_activities_for_context_query(context, opts) do
476 public = [Constants.as_public()]
477
478 recipients =
479 if opts[:user],
480 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
481 else: public
482
483 from(activity in Activity)
484 |> maybe_preload_objects(opts)
485 |> maybe_preload_bookmarks(opts)
486 |> maybe_set_thread_muted_field(opts)
487 |> restrict_blocked(opts)
488 |> restrict_blockers_visibility(opts)
489 |> restrict_recipients(recipients, opts[:user])
490 |> restrict_filtered(opts)
491 |> where(
492 [activity],
493 fragment(
494 "?->>'type' = ? and ?->>'context' = ?",
495 activity.data,
496 "Create",
497 activity.data,
498 ^context
499 )
500 )
501 |> exclude_poll_votes(opts)
502 |> exclude_id(opts)
503 |> order_by([activity], desc: activity.id)
504 end
505
506 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
507 def fetch_activities_for_context(context, opts \\ %{}) do
508 context
509 |> fetch_activities_for_context_query(opts)
510 |> Repo.all()
511 end
512
513 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
514 FlakeId.Ecto.CompatType.t() | nil
515 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
516 context
517 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
518 |> restrict_visibility(%{visibility: "direct"})
519 |> limit(1)
520 |> select([a], a.id)
521 |> Repo.one()
522 end
523
524 defp fetch_paginated_optimized(query, opts, pagination) do
525 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
526 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
527 opts = Map.put(opts, :skip_extra_order, true)
528
529 Pagination.fetch_paginated(query, opts, pagination)
530 end
531
532 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
533 list_memberships = Pleroma.List.memberships(opts[:user])
534
535 fetch_activities_query(recipients ++ list_memberships, opts)
536 |> fetch_paginated_optimized(opts, pagination)
537 |> Enum.reverse()
538 |> maybe_update_cc(list_memberships, opts[:user])
539 end
540
541 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
543 includes_local_public = Map.get(opts, :includes_local_public, false)
544
545 opts = Map.delete(opts, :user)
546
547 intended_recipients =
548 if includes_local_public do
549 [Constants.as_public(), as_local_public()]
550 else
551 [Constants.as_public()]
552 end
553
554 intended_recipients
555 |> fetch_activities_query(opts)
556 |> restrict_unlisted(opts)
557 |> fetch_paginated_optimized(opts, pagination)
558 end
559
560 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
561 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
562 opts
563 |> Map.put(:restrict_unlisted, true)
564 |> fetch_public_or_unlisted_activities(pagination)
565 end
566
567 @valid_visibilities ~w[direct unlisted public private]
568
569 defp restrict_visibility(query, %{visibility: visibility})
570 when is_list(visibility) do
571 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
572 from(
573 a in query,
574 where:
575 fragment(
576 "activity_visibility(?, ?, ?) = ANY (?)",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583 else
584 Logger.error("Could not restrict visibility to #{visibility}")
585 end
586 end
587
588 defp restrict_visibility(query, %{visibility: visibility})
589 when visibility in @valid_visibilities do
590 from(
591 a in query,
592 where:
593 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
594 )
595 end
596
597 defp restrict_visibility(_query, %{visibility: visibility})
598 when visibility not in @valid_visibilities do
599 Logger.error("Could not restrict visibility to #{visibility}")
600 end
601
602 defp restrict_visibility(query, _visibility), do: query
603
604 defp exclude_visibility(query, %{exclude_visibilities: visibility})
605 when is_list(visibility) do
606 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
607 from(
608 a in query,
609 where:
610 not fragment(
611 "activity_visibility(?, ?, ?) = ANY (?)",
612 a.actor,
613 a.recipients,
614 a.data,
615 ^visibility
616 )
617 )
618 else
619 Logger.error("Could not exclude visibility to #{visibility}")
620 query
621 end
622 end
623
624 defp exclude_visibility(query, %{exclude_visibilities: visibility})
625 when visibility in @valid_visibilities do
626 from(
627 a in query,
628 where:
629 not fragment(
630 "activity_visibility(?, ?, ?) = ?",
631 a.actor,
632 a.recipients,
633 a.data,
634 ^visibility
635 )
636 )
637 end
638
639 defp exclude_visibility(query, %{exclude_visibilities: visibility})
640 when visibility not in [nil | @valid_visibilities] do
641 Logger.error("Could not exclude visibility to #{visibility}")
642 query
643 end
644
645 defp exclude_visibility(query, _visibility), do: query
646
647 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
648 do: query
649
650 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
651 do: query
652
653 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
654 local_public = as_local_public()
655
656 from(
657 a in query,
658 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
659 )
660 end
661
662 defp restrict_thread_visibility(query, _, _), do: query
663
664 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
665 params =
666 params
667 |> Map.put(:user, reading_user)
668 |> Map.put(:actor_id, user.ap_id)
669
670 %{
671 godmode: params[:godmode],
672 reading_user: reading_user
673 }
674 |> user_activities_recipients()
675 |> fetch_activities(params)
676 |> Enum.reverse()
677 end
678
679 def fetch_user_activities(user, reading_user, params \\ %{})
680
681 def fetch_user_activities(user, reading_user, %{total: true} = params) do
682 result = fetch_activities_for_user(user, reading_user, params)
683
684 Keyword.put(result, :items, Enum.reverse(result[:items]))
685 end
686
687 def fetch_user_activities(user, reading_user, params) do
688 user
689 |> fetch_activities_for_user(reading_user, params)
690 |> Enum.reverse()
691 end
692
693 defp fetch_activities_for_user(user, reading_user, params) do
694 params =
695 params
696 |> Map.put(:type, ["Create", "Announce"])
697 |> Map.put(:user, reading_user)
698 |> Map.put(:actor_id, user.ap_id)
699 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
700
701 params =
702 if User.blocks?(reading_user, user) do
703 params
704 else
705 params
706 |> Map.put(:blocking_user, reading_user)
707 |> Map.put(:muting_user, reading_user)
708 end
709
710 pagination_type = Map.get(params, :pagination_type) || :keyset
711
712 %{
713 godmode: params[:godmode],
714 reading_user: reading_user
715 }
716 |> user_activities_recipients()
717 |> fetch_activities(params, pagination_type)
718 end
719
720 def fetch_statuses(reading_user, %{total: true} = params) do
721 result = fetch_activities_for_reading_user(reading_user, params)
722 Keyword.put(result, :items, Enum.reverse(result[:items]))
723 end
724
725 def fetch_statuses(reading_user, params) do
726 reading_user
727 |> fetch_activities_for_reading_user(params)
728 |> Enum.reverse()
729 end
730
731 defp fetch_activities_for_reading_user(reading_user, params) do
732 params = Map.put(params, :type, ["Create", "Announce"])
733
734 %{
735 godmode: params[:godmode],
736 reading_user: reading_user
737 }
738 |> user_activities_recipients()
739 |> fetch_activities(params, :offset)
740 end
741
742 def user_activities_recipients(%{godmode: true}), do: []
743
744 def user_activities_recipients(%{reading_user: reading_user}) do
745 if not is_nil(reading_user) and reading_user.local do
746 [
747 Constants.as_public(),
748 as_local_public(),
749 reading_user.ap_id | User.following(reading_user)
750 ]
751 else
752 [Constants.as_public()]
753 end
754 end
755
756 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
757 raise "Can't use the child object without preloading!"
758 end
759
760 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
761 from(
762 [activity, object] in query,
763 where:
764 fragment(
765 "?->>'type' != ? or ?->>'actor' != ?",
766 activity.data,
767 "Announce",
768 object.data,
769 ^actor
770 )
771 )
772 end
773
774 defp restrict_announce_object_actor(query, _), do: query
775
776 defp restrict_since(query, %{since_id: ""}), do: query
777
778 defp restrict_since(query, %{since_id: since_id}) do
779 from(activity in query, where: activity.id > ^since_id)
780 end
781
782 defp restrict_since(query, _), do: query
783
784 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
785 raise_on_missing_preload()
786 end
787
788 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
789 from(
790 [_activity, object] in query,
791 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
792 )
793 end
794
795 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
796 restrict_embedded_tag_any(query, %{tag: tag})
797 end
798
799 defp restrict_embedded_tag_all(query, _), do: query
800
801 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
802 raise_on_missing_preload()
803 end
804
805 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
806 from(
807 [_activity, object] in query,
808 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
809 )
810 end
811
812 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
813 restrict_embedded_tag_any(query, %{tag: [tag]})
814 end
815
816 defp restrict_embedded_tag_any(query, _), do: query
817
818 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
819 raise_on_missing_preload()
820 end
821
822 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
823 from(
824 [_activity, object] in query,
825 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
826 )
827 end
828
829 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
830 when is_binary(tag_reject) do
831 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
832 end
833
834 defp restrict_embedded_tag_reject_any(query, _), do: query
835
836 defp object_ids_query_for_tags(tags) do
837 from(hto in "hashtags_objects")
838 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
839 |> where([hto, ht], ht.name in ^tags)
840 |> select([hto], hto.object_id)
841 |> distinct([hto], true)
842 end
843
844 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
845 raise_on_missing_preload()
846 end
847
848 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
849 restrict_hashtag_any(query, %{tag: single_tag})
850 end
851
852 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
853 from(
854 [_activity, object] in query,
855 where:
856 fragment(
857 """
858 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
859 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
860 AND hashtags_objects.object_id = ?) @> ?
861 """,
862 ^tags,
863 object.id,
864 ^tags
865 )
866 )
867 end
868
869 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
870 restrict_hashtag_all(query, %{tag_all: [tag]})
871 end
872
873 defp restrict_hashtag_all(query, _), do: query
874
875 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
876 raise_on_missing_preload()
877 end
878
879 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
880 hashtag_ids =
881 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
882 |> Repo.all()
883
884 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
885 from(
886 [_activity, object] in query,
887 join: hto in "hashtags_objects",
888 on: hto.object_id == object.id,
889 where: hto.hashtag_id in ^hashtag_ids,
890 distinct: [desc: object.id],
891 order_by: [desc: object.id]
892 )
893 end
894
895 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
896 restrict_hashtag_any(query, %{tag: [tag]})
897 end
898
899 defp restrict_hashtag_any(query, _), do: query
900
901 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
902 raise_on_missing_preload()
903 end
904
905 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
906 from(
907 [_activity, object] in query,
908 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
909 )
910 end
911
912 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
913 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
914 end
915
916 defp restrict_hashtag_reject_any(query, _), do: query
917
918 defp raise_on_missing_preload do
919 raise "Can't use the child object without preloading!"
920 end
921
922 defp restrict_recipients(query, [], _user), do: query
923
924 defp restrict_recipients(query, recipients, nil) do
925 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
926 end
927
928 defp restrict_recipients(query, recipients, user) do
929 from(
930 activity in query,
931 where: fragment("? && ?", ^recipients, activity.recipients),
932 or_where: activity.actor == ^user.ap_id
933 )
934 end
935
936 # Essentially, either look for activities addressed to `recipients`, _OR_ ones
937 # that reference a hashtag that the user follows
938 # Firstly, two fallbacks in case there's no hashtag constraint, or the user doesn't
939 # follow any
940 defp restrict_recipients_or_hashtags(query, recipients, user, nil) do
941 restrict_recipients(query, recipients, user)
942 end
943
944 defp restrict_recipients_or_hashtags(query, recipients, user, []) do
945 restrict_recipients(query, recipients, user)
946 end
947
948 defp restrict_recipients_or_hashtags(query, recipients, _user, hashtag_ids) do
949 from([activity, object] in query)
950 |> join(:left, [activity, object], hto in "hashtags_objects",
951 on: hto.object_id == object.id,
952 as: :hto
953 )
954 |> where(
955 [activity, object, hto: hto],
956 (hto.hashtag_id in ^hashtag_ids and ^Constants.as_public() in activity.recipients) or
957 fragment("? && ?", ^recipients, activity.recipients)
958 )
959 end
960
961 defp restrict_local(query, %{local_only: true}) do
962 from(activity in query, where: activity.local == true)
963 end
964
965 defp restrict_local(query, _), do: query
966
967 defp restrict_remote(query, %{remote: true}) do
968 from(activity in query, where: activity.local == false)
969 end
970
971 defp restrict_remote(query, _), do: query
972
973 defp restrict_actor(query, %{actor_id: actor_id}) do
974 from(activity in query, where: activity.actor == ^actor_id)
975 end
976
977 defp restrict_actor(query, _), do: query
978
979 defp restrict_type(query, %{type: type}) when is_binary(type) do
980 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
981 end
982
983 defp restrict_type(query, %{type: type}) do
984 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
985 end
986
987 defp restrict_type(query, _), do: query
988
989 defp restrict_state(query, %{state: state}) do
990 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
991 end
992
993 defp restrict_state(query, _), do: query
994
995 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
996 from(
997 [_activity, object] in query,
998 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
999 )
1000 end
1001
1002 defp restrict_favorited_by(query, _), do: query
1003
1004 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
1005 raise "Can't use the child object without preloading!"
1006 end
1007
1008 defp restrict_media(query, %{only_media: true}) do
1009 from(
1010 [activity, object] in query,
1011 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
1012 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
1013 )
1014 end
1015
1016 defp restrict_media(query, _), do: query
1017
1018 defp restrict_replies(query, %{exclude_replies: true}) do
1019 from(
1020 [_activity, object] in query,
1021 where: fragment("?->>'inReplyTo' is null", object.data)
1022 )
1023 end
1024
1025 defp restrict_replies(query, %{
1026 reply_filtering_user: %User{} = user,
1027 reply_visibility: "self"
1028 }) do
1029 from(
1030 [activity, object] in query,
1031 where:
1032 fragment(
1033 "?->>'inReplyTo' is null OR ? = ANY(?)",
1034 object.data,
1035 ^user.ap_id,
1036 activity.recipients
1037 )
1038 )
1039 end
1040
1041 defp restrict_replies(query, %{
1042 reply_filtering_user: %User{} = user,
1043 reply_visibility: "following"
1044 }) do
1045 from(
1046 [activity, object] in query,
1047 where:
1048 fragment(
1049 """
1050 ?->>'type' != 'Create' -- This isn't a Create
1051 OR ?->>'inReplyTo' is null -- this isn't a reply
1052 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1053 -- unless they are the author (because authors
1054 -- are also part of the recipients). This leads
1055 -- to a bug that self-replies by friends won't
1056 -- show up.
1057 OR ? = ? -- The actor is us
1058 """,
1059 activity.data,
1060 object.data,
1061 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1062 activity.recipients,
1063 activity.actor,
1064 activity.actor,
1065 ^user.ap_id
1066 )
1067 )
1068 end
1069
1070 defp restrict_replies(query, _), do: query
1071
1072 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1073 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1074 end
1075
1076 defp restrict_reblogs(query, _), do: query
1077
1078 defp restrict_muted(query, %{with_muted: true}), do: query
1079
1080 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1081 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1082
1083 query =
1084 from([activity] in query,
1085 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1086 where:
1087 fragment(
1088 "not (?->'to' \\?| ?) or ? = ?",
1089 activity.data,
1090 ^mutes,
1091 activity.actor,
1092 ^user.ap_id
1093 )
1094 )
1095
1096 unless opts[:skip_preload] do
1097 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1098 else
1099 query
1100 end
1101 end
1102
1103 defp restrict_muted(query, _), do: query
1104
1105 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1106 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1107 domain_blocks = user.domain_blocks || []
1108
1109 following_ap_ids = User.get_friends_ap_ids(user)
1110
1111 query =
1112 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1113
1114 from(
1115 [activity, object: o] in query,
1116 # You don't block the author
1117 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1118
1119 # You don't block any recipients, and didn't author the post
1120 where:
1121 fragment(
1122 "((not (? && ?)) or ? = ?)",
1123 activity.recipients,
1124 ^blocked_ap_ids,
1125 activity.actor,
1126 ^user.ap_id
1127 ),
1128
1129 # You don't block the domain of any recipients, and didn't author the post
1130 where:
1131 fragment(
1132 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1133 activity.recipients,
1134 ^domain_blocks,
1135 activity.actor,
1136 ^user.ap_id
1137 ),
1138
1139 # It's not a boost of a user you block
1140 where:
1141 fragment(
1142 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1143 activity.data,
1144 activity.data,
1145 ^blocked_ap_ids
1146 ),
1147
1148 # You don't block the author's domain, and also don't follow the author
1149 where:
1150 fragment(
1151 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1152 activity.actor,
1153 ^domain_blocks,
1154 activity.actor,
1155 ^following_ap_ids
1156 ),
1157
1158 # Same as above, but checks the Object
1159 where:
1160 fragment(
1161 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1162 o.data,
1163 ^domain_blocks,
1164 o.data,
1165 ^following_ap_ids
1166 )
1167 )
1168 end
1169
1170 defp restrict_blocked(query, _), do: query
1171
1172 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1173 if Config.get([:activitypub, :blockers_visible]) == true do
1174 query
1175 else
1176 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1177
1178 from(
1179 activity in query,
1180 # The author doesn't block you
1181 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1182
1183 # It's not a boost of a user that blocks you
1184 where:
1185 fragment(
1186 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1187 activity.data,
1188 activity.data,
1189 ^blocker_ap_ids
1190 )
1191 )
1192 end
1193 end
1194
1195 defp restrict_blockers_visibility(query, _), do: query
1196
1197 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1198 from(
1199 activity in query,
1200 where:
1201 fragment(
1202 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1203 activity.data,
1204 ^[Constants.as_public()]
1205 )
1206 )
1207 end
1208
1209 defp restrict_unlisted(query, _), do: query
1210
1211 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1212 from(
1213 [activity, object: o] in query,
1214 where:
1215 fragment(
1216 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1217 activity.data,
1218 activity.data,
1219 activity.data,
1220 ^ids
1221 )
1222 )
1223 end
1224
1225 defp restrict_pinned(query, _), do: query
1226
1227 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1228 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1229
1230 from(
1231 activity in query,
1232 where:
1233 fragment(
1234 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1235 activity.data,
1236 activity.actor,
1237 ^muted_reblogs
1238 )
1239 )
1240 end
1241
1242 defp restrict_muted_reblogs(query, _), do: query
1243
1244 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1245 from(
1246 activity in query,
1247 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1248 )
1249 end
1250
1251 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1252 from(
1253 activity in query,
1254 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1255 )
1256 end
1257
1258 defp restrict_instance(query, _), do: query
1259
1260 defp restrict_filtered(query, %{user: %User{} = user}) do
1261 case Filter.compose_regex(user) do
1262 nil ->
1263 query
1264
1265 regex ->
1266 from([activity, object] in query,
1267 where:
1268 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1269 activity.actor == ^user.ap_id
1270 )
1271 end
1272 end
1273
1274 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1275 restrict_filtered(query, %{user: user})
1276 end
1277
1278 defp restrict_filtered(query, _), do: query
1279
1280 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1281
1282 defp exclude_poll_votes(query, _) do
1283 if has_named_binding?(query, :object) do
1284 from([activity, object: o] in query,
1285 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1286 )
1287 else
1288 query
1289 end
1290 end
1291
1292 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1293 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1294
1295 defp exclude_invisible_actors(query, _opts) do
1296 query
1297 |> join(:inner, [activity], u in User,
1298 as: :u,
1299 on: activity.actor == u.ap_id and u.invisible == false
1300 )
1301 end
1302
1303 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1304 from(activity in query, where: activity.id != ^id)
1305 end
1306
1307 defp exclude_id(query, _), do: query
1308
1309 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1310
1311 defp maybe_preload_objects(query, _) do
1312 query
1313 |> Activity.with_preloaded_object()
1314 end
1315
1316 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1317
1318 defp maybe_preload_bookmarks(query, opts) do
1319 query
1320 |> Activity.with_preloaded_bookmark(opts[:user])
1321 end
1322
1323 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1324 query
1325 |> Activity.with_preloaded_report_notes()
1326 end
1327
1328 defp maybe_preload_report_notes(query, _), do: query
1329
1330 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1331
1332 defp maybe_set_thread_muted_field(query, opts) do
1333 query
1334 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1335 end
1336
1337 defp maybe_order(query, %{order: :desc}) do
1338 query
1339 |> order_by(desc: :id)
1340 end
1341
1342 defp maybe_order(query, %{order: :asc}) do
1343 query
1344 |> order_by(asc: :id)
1345 end
1346
1347 defp maybe_order(query, _), do: query
1348
1349 defp normalize_fetch_activities_query_opts(opts) do
1350 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1351 case opts[key] do
1352 value when is_bitstring(value) ->
1353 Map.put(opts, key, Hashtag.normalize_name(value))
1354
1355 value when is_list(value) ->
1356 normalized_value =
1357 value
1358 |> Enum.map(&Hashtag.normalize_name/1)
1359 |> Enum.uniq()
1360
1361 Map.put(opts, key, normalized_value)
1362
1363 _ ->
1364 opts
1365 end
1366 end)
1367 end
1368
1369 defp fetch_activities_query_ap_ids_ops(opts) do
1370 source_user = opts[:muting_user]
1371 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1372
1373 ap_id_relationships =
1374 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1375 [:block | ap_id_relationships]
1376 else
1377 ap_id_relationships
1378 end
1379
1380 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1381
1382 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1383 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1384
1385 restrict_muted_reblogs_opts =
1386 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1387
1388 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1389 end
1390
1391 def fetch_activities_query(recipients, opts \\ %{}) do
1392 opts = normalize_fetch_activities_query_opts(opts)
1393
1394 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1395 fetch_activities_query_ap_ids_ops(opts)
1396
1397 config = %{
1398 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1399 }
1400
1401 query =
1402 Activity
1403 |> maybe_preload_objects(opts)
1404 |> maybe_preload_bookmarks(opts)
1405 |> maybe_preload_report_notes(opts)
1406 |> maybe_set_thread_muted_field(opts)
1407 |> maybe_order(opts)
1408 |> restrict_recipients_or_hashtags(recipients, opts[:user], opts[:followed_hashtags])
1409 |> restrict_replies(opts)
1410 |> restrict_since(opts)
1411 |> restrict_local(opts)
1412 |> restrict_remote(opts)
1413 |> restrict_actor(opts)
1414 |> restrict_type(opts)
1415 |> restrict_state(opts)
1416 |> restrict_favorited_by(opts)
1417 |> restrict_blocked(restrict_blocked_opts)
1418 |> restrict_blockers_visibility(opts)
1419 |> restrict_muted(restrict_muted_opts)
1420 |> restrict_filtered(opts)
1421 |> restrict_media(opts)
1422 |> restrict_visibility(opts)
1423 |> restrict_thread_visibility(opts, config)
1424 |> restrict_reblogs(opts)
1425 |> restrict_pinned(opts)
1426 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1427 |> restrict_instance(opts)
1428 |> restrict_announce_object_actor(opts)
1429 |> restrict_filtered(opts)
1430 |> maybe_restrict_deactivated_users(opts)
1431 |> exclude_poll_votes(opts)
1432 |> exclude_invisible_actors(opts)
1433 |> exclude_visibility(opts)
1434
1435 if Config.feature_enabled?(:improved_hashtag_timeline) do
1436 query
1437 |> restrict_hashtag_any(opts)
1438 |> restrict_hashtag_all(opts)
1439 |> restrict_hashtag_reject_any(opts)
1440 else
1441 query
1442 |> restrict_embedded_tag_any(opts)
1443 |> restrict_embedded_tag_all(opts)
1444 |> restrict_embedded_tag_reject_any(opts)
1445 end
1446 end
1447
1448 @doc """
1449 Fetch favorites activities of user with order by sort adds to favorites
1450 """
1451 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1452 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1453 user.ap_id
1454 |> Activity.Queries.by_actor()
1455 |> Activity.Queries.by_type("Like")
1456 |> Activity.with_joined_object()
1457 |> Object.with_joined_activity()
1458 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1459 |> order_by([like, _, _], desc_nulls_last: like.id)
1460 |> Pagination.fetch_paginated(
1461 Map.merge(params, %{skip_order: true}),
1462 pagination
1463 )
1464 end
1465
1466 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1467 Enum.map(activities, fn
1468 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1469 if Enum.any?(bcc, &(&1 in list_memberships)) do
1470 update_in(activity.data["cc"], &[user_ap_id | &1])
1471 else
1472 activity
1473 end
1474
1475 activity ->
1476 activity
1477 end)
1478 end
1479
1480 defp maybe_update_cc(activities, _, _), do: activities
1481
1482 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1483 from(activity in query,
1484 where:
1485 fragment("? && ?", activity.recipients, ^recipients) or
1486 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1487 ^Constants.as_public() in activity.recipients)
1488 )
1489 end
1490
1491 def fetch_activities_bounded(
1492 recipients,
1493 recipients_with_public,
1494 opts \\ %{},
1495 pagination \\ :keyset
1496 ) do
1497 fetch_activities_query([], opts)
1498 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1499 |> Pagination.fetch_paginated(opts, pagination)
1500 |> Enum.reverse()
1501 end
1502
1503 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1504 def upload(file, opts \\ []) do
1505 with {:ok, data} <- Upload.store(file, opts) do
1506 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1507
1508 Repo.insert(%Object{data: obj_data})
1509 end
1510 end
1511
1512 @spec get_actor_url(any()) :: binary() | nil
1513 defp get_actor_url(url) when is_binary(url), do: url
1514 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1515
1516 defp get_actor_url(url) when is_list(url) do
1517 url
1518 |> List.first()
1519 |> get_actor_url()
1520 end
1521
1522 defp get_actor_url(_url), do: nil
1523
1524 defp normalize_image(%{"url" => url}) do
1525 %{
1526 "type" => "Image",
1527 "url" => [%{"href" => url}]
1528 }
1529 end
1530
1531 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1532 defp normalize_image(_), do: nil
1533
1534 defp object_to_user_data(data, additional) do
1535 fields =
1536 data
1537 |> Map.get("attachment", [])
1538 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1539 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1540
1541 emojis =
1542 data
1543 |> Map.get("tag", [])
1544 |> Enum.filter(fn
1545 %{"type" => "Emoji"} -> true
1546 _ -> false
1547 end)
1548 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1549 {String.trim(name, ":"), url}
1550 end)
1551
1552 is_locked = data["manuallyApprovesFollowers"] || false
1553 data = Transmogrifier.maybe_fix_user_object(data)
1554 is_discoverable = data["discoverable"] || false
1555 invisible = data["invisible"] || false
1556 actor_type = data["type"] || "Person"
1557
1558 featured_address = data["featured"]
1559 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1560
1561 public_key =
1562 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1563 data["publicKey"]["publicKeyPem"]
1564 end
1565
1566 shared_inbox =
1567 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1568 data["endpoints"]["sharedInbox"]
1569 end
1570
1571 # if WebFinger request was already done, we probably have acct, otherwise
1572 # we request WebFinger here
1573 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1574
1575 # also_known_as must be a URL
1576 also_known_as =
1577 data
1578 |> Map.get("alsoKnownAs", [])
1579 |> Enum.filter(fn url ->
1580 case URI.parse(url) do
1581 %URI{scheme: "http"} -> true
1582 %URI{scheme: "https"} -> true
1583 _ -> false
1584 end
1585 end)
1586
1587 %{
1588 ap_id: data["id"],
1589 uri: get_actor_url(data["url"]),
1590 ap_enabled: true,
1591 banner: normalize_image(data["image"]),
1592 fields: fields,
1593 emoji: emojis,
1594 is_locked: is_locked,
1595 is_discoverable: is_discoverable,
1596 invisible: invisible,
1597 avatar: normalize_image(data["icon"]),
1598 name: data["name"],
1599 follower_address: data["followers"],
1600 following_address: data["following"],
1601 featured_address: featured_address,
1602 bio: data["summary"] || "",
1603 actor_type: actor_type,
1604 also_known_as: also_known_as,
1605 public_key: public_key,
1606 inbox: data["inbox"],
1607 shared_inbox: shared_inbox,
1608 pinned_objects: pinned_objects,
1609 nickname: nickname
1610 }
1611 end
1612
1613 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1614 generated = "#{username}@#{URI.parse(data["id"]).host}"
1615
1616 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1617 case WebFinger.finger(generated) do
1618 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1619 _ -> generated
1620 end
1621 else
1622 generated
1623 end
1624 end
1625
1626 # nickname can be nil because of virtual actors
1627 defp generate_nickname(_), do: nil
1628
1629 def fetch_follow_information_for_user(user) do
1630 with {:ok, following_data} <-
1631 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1632 {:ok, hide_follows} <- collection_private(following_data),
1633 {:ok, followers_data} <-
1634 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1635 {:ok, hide_followers} <- collection_private(followers_data) do
1636 {:ok,
1637 %{
1638 hide_follows: hide_follows,
1639 follower_count: normalize_counter(followers_data["totalItems"]),
1640 following_count: normalize_counter(following_data["totalItems"]),
1641 hide_followers: hide_followers
1642 }}
1643 else
1644 {:error, _} = e -> e
1645 e -> {:error, e}
1646 end
1647 end
1648
1649 defp normalize_counter(counter) when is_integer(counter), do: counter
1650 defp normalize_counter(_), do: 0
1651
1652 def maybe_update_follow_information(user_data) do
1653 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1654 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1655 {_, true} <-
1656 {:collections_available,
1657 !!(user_data[:following_address] && user_data[:follower_address])},
1658 {:ok, info} <-
1659 fetch_follow_information_for_user(user_data) do
1660 info = Map.merge(user_data[:info] || %{}, info)
1661
1662 user_data
1663 |> Map.put(:info, info)
1664 else
1665 {:user_type_check, false} ->
1666 user_data
1667
1668 {:collections_available, false} ->
1669 user_data
1670
1671 {:enabled, false} ->
1672 user_data
1673
1674 e ->
1675 Logger.error(
1676 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1677 )
1678
1679 user_data
1680 end
1681 end
1682
1683 defp collection_private(%{"first" => %{"type" => type}})
1684 when type in ["CollectionPage", "OrderedCollectionPage"],
1685 do: {:ok, false}
1686
1687 defp collection_private(%{"first" => first}) do
1688 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1689 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1690 {:ok, false}
1691 else
1692 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1693 {:error, _} = e -> e
1694 e -> {:error, e}
1695 end
1696 end
1697
1698 defp collection_private(_data), do: {:ok, true}
1699
1700 def user_data_from_user_object(data, additional \\ []) do
1701 with {:ok, data} <- MRF.filter(data) do
1702 {:ok, object_to_user_data(data, additional)}
1703 else
1704 e -> {:error, e}
1705 end
1706 end
1707
1708 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1709 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1710 {:ok, data} <- user_data_from_user_object(data, additional) do
1711 {:ok, maybe_update_follow_information(data)}
1712 else
1713 # If this has been deleted, only log a debug and not an error
1714 {:error, "Object has been deleted" = e} ->
1715 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1716 {:error, e}
1717
1718 {:error, {:reject, reason} = e} ->
1719 Logger.debug("Rejected user #{ap_id}: #{inspect(reason)}")
1720 {:error, e}
1721
1722 {:error, e} ->
1723 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1724 {:error, e}
1725 end
1726 end
1727
1728 def maybe_handle_clashing_nickname(data) do
1729 with nickname when is_binary(nickname) <- data[:nickname],
1730 %User{} = old_user <- User.get_by_nickname(nickname),
1731 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1732 Logger.info(
1733 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1734 )
1735
1736 old_user
1737 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1738 |> User.update_and_set_cache()
1739 else
1740 {:ap_id_comparison, true} ->
1741 Logger.info(
1742 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1743 )
1744
1745 _ ->
1746 nil
1747 end
1748 end
1749
1750 def pin_data_from_featured_collection(%{
1751 "type" => "OrderedCollection",
1752 "first" => first
1753 }) do
1754 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1755 page
1756 |> Map.get("orderedItems")
1757 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1758 else
1759 e ->
1760 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1761 {:ok, %{}}
1762 end
1763 end
1764
1765 def pin_data_from_featured_collection(
1766 %{
1767 "type" => type
1768 } = collection
1769 )
1770 when type in ["OrderedCollection", "Collection"] do
1771 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1772
1773 # Items can either be a map _or_ a string
1774 objects
1775 |> Map.new(fn
1776 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1777 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1778 end)
1779 end
1780
1781 def fetch_and_prepare_featured_from_ap_id(nil) do
1782 {:ok, %{}}
1783 end
1784
1785 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1786 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1787 {:ok, pin_data_from_featured_collection(data)}
1788 else
1789 e ->
1790 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1791 {:ok, %{}}
1792 end
1793 end
1794
1795 def pinned_fetch_task(nil), do: nil
1796
1797 def pinned_fetch_task(%{pinned_objects: pins}) do
1798 if Enum.all?(pins, fn {ap_id, _} ->
1799 Object.get_cached_by_ap_id(ap_id) ||
1800 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1801 end) do
1802 :ok
1803 else
1804 :error
1805 end
1806 end
1807
1808 def make_user_from_ap_id(ap_id, additional \\ []) do
1809 user = User.get_cached_by_ap_id(ap_id)
1810
1811 if user && !User.ap_enabled?(user) do
1812 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1813 else
1814 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1815 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1816
1817 if user do
1818 user
1819 |> User.remote_user_changeset(data)
1820 |> User.update_and_set_cache()
1821 else
1822 maybe_handle_clashing_nickname(data)
1823
1824 data
1825 |> User.remote_user_changeset()
1826 |> Repo.insert()
1827 |> User.set_cache()
1828 end
1829 end
1830 end
1831 end
1832
1833 def make_user_from_nickname(nickname) do
1834 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1835 WebFinger.finger(nickname) do
1836 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1837 else
1838 _e -> {:error, "No AP id in WebFinger"}
1839 end
1840 end
1841
1842 # filter out broken threads
1843 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1844 entire_thread_visible_for_user?(activity, user)
1845 end
1846
1847 # do post-processing on a specific activity
1848 def contain_activity(%Activity{} = activity, %User{} = user) do
1849 contain_broken_threads(activity, user)
1850 end
1851
1852 def fetch_direct_messages_query do
1853 Activity
1854 |> restrict_type(%{type: "Create"})
1855 |> restrict_visibility(%{visibility: "direct"})
1856 |> order_by([activity], asc: activity.id)
1857 end
1858
1859 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1860
1861 defp maybe_restrict_deactivated_users(activity, _opts),
1862 do: Activity.restrict_deactivated_users(activity)
1863 end