Make pagination type conditional
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
43 {recipients, to, cc}
44 end
45
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
51 {recipients, to, cc}
52 end
53
54 defp check_actor_is_active(nil), do: true
55
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
59 _ -> false
60 end
61 end
62
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
66 end
67
68 defp check_remote_limit(_), do: true
69
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 end
73
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 end
77
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 "type" => "Create"
81 }) do
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
84 end
85 end
86
87 defp increase_replies_count_if_reply(_create_data), do: :noop
88
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
90 @impl true
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
93 {:ok, object, meta}
94 end
95 end
96
97 @impl true
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
101 {:ok, activity} <-
102 Repo.insert(%Activity{
103 data: object,
104 local: local,
105 recipients: recipients,
106 actor: object["actor"]
107 }),
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
111 end
112 end
113
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
128
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
131 end)
132
133 {:ok, activity}
134 else
135 %Activity{} = activity ->
136 {:ok, activity}
137
138 {:actor_check, _} ->
139 {:error, false}
140
141 {:containment, _} = error ->
142 error
143
144 {:error, _} = error ->
145 error
146
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
149 data: map,
150 local: local,
151 actor: map["actor"],
152 recipients: recipients,
153 id: "pleroma:fakeid"
154 }
155
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
157 {:ok, activity}
158
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
161
162 {:reject, _} = e ->
163 {:error, e}
164 end
165 end
166
167 defp insert_activity_with_expiration(data, local, recipients) do
168 struct = %Activity{
169 data: data,
170 local: local,
171 actor: data["actor"],
172 recipients: recipients
173 }
174
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
177 end
178 end
179
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
182
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
185 stream_out(activity)
186 stream_out_participations(participations)
187 end
188
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
191 ) do
192 with {:ok, _job} <-
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
196 }) do
197 {:ok, activity}
198 end
199 end
200
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
202
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
207 {:ok, conversation}
208 end
209 end
210
211 defp get_participations({:ok, conversation}) do
212 conversation
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
215 end
216
217 defp get_participations(_), do: []
218
219 def stream_out_participations(participations) do
220 participations =
221 participations
222 |> Repo.preload(:user)
223
224 Streamer.stream("participation", participations)
225 end
226
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
230
231 last_activity_id =
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
233 user: user,
234 blocking_user: user
235 })
236
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
239 end
240 end
241 end
242
243 def stream_out_participations(_, _), do: :noop
244
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
247 activity
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
250 end
251
252 def stream_out(_activity) do
253 :noop
254 end
255
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
259 result
260 end
261 end
262
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
269
270 create_data =
271 make_create_data(
272 %{to: to, actor: actor, published: published, context: context, object: object},
273 additional
274 )
275
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 else
285 {:quick_insert, true, activity} ->
286 {:ok, activity}
287
288 {:fake, true, activity} ->
289 {:ok, activity}
290
291 {:error, message} ->
292 Repo.rollback(message)
293 end
294 end
295
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
302
303 listen_data =
304 make_listen_data(
305 %{to: to, actor: actor, published: published, context: context, object: object},
306 additional
307 )
308
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 end
314 end
315
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
321 result
322 end
323 end
324
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
332 {:ok, activity}
333 else
334 nil -> nil
335 {:error, error} -> Repo.rollback(error)
336 end
337 end
338
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def flag(params) do
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
342 result
343 end
344 end
345
346 defp do_flag(
347 %{
348 actor: actor,
349 context: _context,
350 account: account,
351 statuses: statuses,
352 content: content
353 } = params
354 ) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
358
359 additional = params[:additional] || %{}
360
361 additional =
362 if forward do
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
364 else
365 Map.merge(additional, %{"to" => [], "cc" => []})
366 end
367
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
372 :ok <-
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
377 superuser
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
380 end)
381
382 {:ok, activity}
383 else
384 {:error, error} -> Repo.rollback(error)
385 end
386 end
387
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
390 params = %{
391 "type" => "Move",
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
395 }
396
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
401
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
405 })
406
407 {:ok, activity}
408 else
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
410 err -> err
411 end
412 end
413
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
416
417 recipients =
418 if opts[:user],
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
420 else: public
421
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
429 |> where(
430 [activity],
431 fragment(
432 "?->>'type' = ? and ?->>'context' = ?",
433 activity.data,
434 "Create",
435 activity.data,
436 ^context
437 )
438 )
439 |> exclude_poll_votes(opts)
440 |> exclude_id(opts)
441 |> order_by([activity], desc: activity.id)
442 end
443
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
446 context
447 |> fetch_activities_for_context_query(opts)
448 |> Repo.all()
449 end
450
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
457 |> limit(1)
458 |> select([a], a.id)
459 |> Repo.one()
460 end
461
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
465
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
470 end
471
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
474 opts
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
477 end
478
479 @valid_visibilities ~w[direct unlisted public private]
480
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
484 from(
485 a in query,
486 where:
487 fragment(
488 "activity_visibility(?, ?, ?) = ANY (?)",
489 a.actor,
490 a.recipients,
491 a.data,
492 ^visibility
493 )
494 )
495 else
496 Logger.error("Could not restrict visibility to #{visibility}")
497 end
498 end
499
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
502 from(
503 a in query,
504 where:
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 )
507 end
508
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
512 end
513
514 defp restrict_visibility(query, _visibility), do: query
515
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
519 from(
520 a in query,
521 where:
522 not fragment(
523 "activity_visibility(?, ?, ?) = ANY (?)",
524 a.actor,
525 a.recipients,
526 a.data,
527 ^visibility
528 )
529 )
530 else
531 Logger.error("Could not exclude visibility to #{visibility}")
532 query
533 end
534 end
535
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
538 from(
539 a in query,
540 where:
541 not fragment(
542 "activity_visibility(?, ?, ?) = ?",
543 a.actor,
544 a.recipients,
545 a.data,
546 ^visibility
547 )
548 )
549 end
550
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
554 query
555 end
556
557 defp exclude_visibility(query, _visibility), do: query
558
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
563 do: query
564
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
566 from(
567 a in query,
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 )
570 end
571
572 defp restrict_thread_visibility(query, _, _), do: query
573
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
575 params =
576 params
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579
580 %{
581 godmode: params[:godmode],
582 reading_user: reading_user
583 }
584 |> user_activities_recipients()
585 |> fetch_activities(params)
586 |> Enum.reverse()
587 end
588
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
590 params =
591 params
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
596
597 params =
598 if User.blocks?(reading_user, user) do
599 params
600 else
601 params
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
604 end
605
606 pagination_type =
607 cond do
608 is_nil(params[:offset]) -> :keyset
609 true -> :offset
610 end
611
612 %{
613 godmode: params[:godmode],
614 reading_user: reading_user
615 }
616 |> user_activities_recipients()
617 |> fetch_activities(params, pagination_type)
618 |> Enum.reverse()
619 end
620
621 def fetch_statuses(reading_user, params) do
622 params = Map.put(params, :type, ["Create", "Announce"])
623
624 %{
625 godmode: params[:godmode],
626 reading_user: reading_user
627 }
628 |> user_activities_recipients()
629 |> fetch_activities(params, :offset)
630 |> Enum.reverse()
631 end
632
633 defp user_activities_recipients(%{godmode: true}), do: []
634
635 defp user_activities_recipients(%{reading_user: reading_user}) do
636 if reading_user do
637 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
638 else
639 [Constants.as_public()]
640 end
641 end
642
643 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
644 raise "Can't use the child object without preloading!"
645 end
646
647 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
648 from(
649 [activity, object] in query,
650 where:
651 fragment(
652 "?->>'type' != ? or ?->>'actor' != ?",
653 activity.data,
654 "Announce",
655 object.data,
656 ^actor
657 )
658 )
659 end
660
661 defp restrict_announce_object_actor(query, _), do: query
662
663 defp restrict_since(query, %{since_id: ""}), do: query
664
665 defp restrict_since(query, %{since_id: since_id}) do
666 from(activity in query, where: activity.id > ^since_id)
667 end
668
669 defp restrict_since(query, _), do: query
670
671 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
672 raise "Can't use the child object without preloading!"
673 end
674
675 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
676 from(
677 [_activity, object] in query,
678 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
679 )
680 end
681
682 defp restrict_tag_reject(query, _), do: query
683
684 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
685 raise "Can't use the child object without preloading!"
686 end
687
688 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
689 from(
690 [_activity, object] in query,
691 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
692 )
693 end
694
695 defp restrict_tag_all(query, _), do: query
696
697 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
698 raise "Can't use the child object without preloading!"
699 end
700
701 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
702 from(
703 [_activity, object] in query,
704 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
705 )
706 end
707
708 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
709 from(
710 [_activity, object] in query,
711 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
712 )
713 end
714
715 defp restrict_tag(query, _), do: query
716
717 defp restrict_recipients(query, [], _user), do: query
718
719 defp restrict_recipients(query, recipients, nil) do
720 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
721 end
722
723 defp restrict_recipients(query, recipients, user) do
724 from(
725 activity in query,
726 where: fragment("? && ?", ^recipients, activity.recipients),
727 or_where: activity.actor == ^user.ap_id
728 )
729 end
730
731 defp restrict_local(query, %{local_only: true}) do
732 from(activity in query, where: activity.local == true)
733 end
734
735 defp restrict_local(query, _), do: query
736
737 defp restrict_actor(query, %{actor_id: actor_id}) do
738 from(activity in query, where: activity.actor == ^actor_id)
739 end
740
741 defp restrict_actor(query, _), do: query
742
743 defp restrict_type(query, %{type: type}) when is_binary(type) do
744 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
745 end
746
747 defp restrict_type(query, %{type: type}) do
748 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
749 end
750
751 defp restrict_type(query, _), do: query
752
753 defp restrict_state(query, %{state: state}) do
754 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
755 end
756
757 defp restrict_state(query, _), do: query
758
759 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
760 from(
761 [_activity, object] in query,
762 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
763 )
764 end
765
766 defp restrict_favorited_by(query, _), do: query
767
768 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
769 raise "Can't use the child object without preloading!"
770 end
771
772 defp restrict_media(query, %{only_media: true}) do
773 from(
774 [activity, object] in query,
775 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
776 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
777 )
778 end
779
780 defp restrict_media(query, _), do: query
781
782 defp restrict_replies(query, %{exclude_replies: true}) do
783 from(
784 [_activity, object] in query,
785 where: fragment("?->>'inReplyTo' is null", object.data)
786 )
787 end
788
789 defp restrict_replies(query, %{
790 reply_filtering_user: %User{} = user,
791 reply_visibility: "self"
792 }) do
793 from(
794 [activity, object] in query,
795 where:
796 fragment(
797 "?->>'inReplyTo' is null OR ? = ANY(?)",
798 object.data,
799 ^user.ap_id,
800 activity.recipients
801 )
802 )
803 end
804
805 defp restrict_replies(query, %{
806 reply_filtering_user: %User{} = user,
807 reply_visibility: "following"
808 }) do
809 from(
810 [activity, object] in query,
811 where:
812 fragment(
813 """
814 ?->>'type' != 'Create' -- This isn't a Create
815 OR ?->>'inReplyTo' is null -- this isn't a reply
816 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
817 -- unless they are the author (because authors
818 -- are also part of the recipients). This leads
819 -- to a bug that self-replies by friends won't
820 -- show up.
821 OR ? = ? -- The actor is us
822 """,
823 activity.data,
824 object.data,
825 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
826 activity.recipients,
827 activity.actor,
828 activity.actor,
829 ^user.ap_id
830 )
831 )
832 end
833
834 defp restrict_replies(query, _), do: query
835
836 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
837 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
838 end
839
840 defp restrict_reblogs(query, _), do: query
841
842 defp restrict_muted(query, %{with_muted: true}), do: query
843
844 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
845 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
846
847 query =
848 from([activity] in query,
849 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
850 where:
851 fragment(
852 "not (?->'to' \\?| ?) or ? = ?",
853 activity.data,
854 ^mutes,
855 activity.actor,
856 ^user.ap_id
857 )
858 )
859
860 unless opts[:skip_preload] do
861 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
862 else
863 query
864 end
865 end
866
867 defp restrict_muted(query, _), do: query
868
869 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
870 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
871 domain_blocks = user.domain_blocks || []
872
873 following_ap_ids = User.get_friends_ap_ids(user)
874
875 query =
876 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
877
878 from(
879 [activity, object: o] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
881 where:
882 fragment(
883 "((not (? && ?)) or ? = ?)",
884 activity.recipients,
885 ^blocked_ap_ids,
886 activity.actor,
887 ^user.ap_id
888 ),
889 where:
890 fragment(
891 "recipients_contain_blocked_domains(?, ?) = false",
892 activity.recipients,
893 ^domain_blocks
894 ),
895 where:
896 fragment(
897 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
898 activity.data,
899 activity.data,
900 ^blocked_ap_ids
901 ),
902 where:
903 fragment(
904 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
905 activity.actor,
906 ^domain_blocks,
907 activity.actor,
908 ^following_ap_ids
909 ),
910 where:
911 fragment(
912 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
913 o.data,
914 ^domain_blocks,
915 o.data,
916 ^following_ap_ids
917 )
918 )
919 end
920
921 defp restrict_blocked(query, _), do: query
922
923 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
924 from(
925 activity in query,
926 where:
927 fragment(
928 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
929 activity.data,
930 ^[Constants.as_public()]
931 )
932 )
933 end
934
935 defp restrict_unlisted(query, _), do: query
936
937 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
938 from(activity in query, where: activity.id in ^ids)
939 end
940
941 defp restrict_pinned(query, _), do: query
942
943 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
944 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
945
946 from(
947 activity in query,
948 where:
949 fragment(
950 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
951 activity.data,
952 activity.actor,
953 ^muted_reblogs
954 )
955 )
956 end
957
958 defp restrict_muted_reblogs(query, _), do: query
959
960 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
961 from(
962 activity in query,
963 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
964 )
965 end
966
967 defp restrict_instance(query, _), do: query
968
969 defp restrict_filtered(query, %{user: %User{} = user}) do
970 case Filter.compose_regex(user) do
971 nil ->
972 query
973
974 regex ->
975 from([activity, object] in query,
976 where:
977 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
978 activity.actor == ^user.ap_id
979 )
980 end
981 end
982
983 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
984 restrict_filtered(query, %{user: user})
985 end
986
987 defp restrict_filtered(query, _), do: query
988
989 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
990
991 defp exclude_poll_votes(query, _) do
992 if has_named_binding?(query, :object) do
993 from([activity, object: o] in query,
994 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
995 )
996 else
997 query
998 end
999 end
1000
1001 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1002
1003 defp exclude_chat_messages(query, _) do
1004 if has_named_binding?(query, :object) do
1005 from([activity, object: o] in query,
1006 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1007 )
1008 else
1009 query
1010 end
1011 end
1012
1013 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1014
1015 defp exclude_invisible_actors(query, _opts) do
1016 invisible_ap_ids =
1017 User.Query.build(%{invisible: true, select: [:ap_id]})
1018 |> Repo.all()
1019 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1020
1021 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1022 end
1023
1024 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1025 from(activity in query, where: activity.id != ^id)
1026 end
1027
1028 defp exclude_id(query, _), do: query
1029
1030 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1031
1032 defp maybe_preload_objects(query, _) do
1033 query
1034 |> Activity.with_preloaded_object()
1035 end
1036
1037 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1038
1039 defp maybe_preload_bookmarks(query, opts) do
1040 query
1041 |> Activity.with_preloaded_bookmark(opts[:user])
1042 end
1043
1044 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1045 query
1046 |> Activity.with_preloaded_report_notes()
1047 end
1048
1049 defp maybe_preload_report_notes(query, _), do: query
1050
1051 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1052
1053 defp maybe_set_thread_muted_field(query, opts) do
1054 query
1055 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1056 end
1057
1058 defp maybe_order(query, %{order: :desc}) do
1059 query
1060 |> order_by(desc: :id)
1061 end
1062
1063 defp maybe_order(query, %{order: :asc}) do
1064 query
1065 |> order_by(asc: :id)
1066 end
1067
1068 defp maybe_order(query, _), do: query
1069
1070 defp fetch_activities_query_ap_ids_ops(opts) do
1071 source_user = opts[:muting_user]
1072 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1073
1074 ap_id_relationships =
1075 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1076 [:block | ap_id_relationships]
1077 else
1078 ap_id_relationships
1079 end
1080
1081 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1082
1083 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1084 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1085
1086 restrict_muted_reblogs_opts =
1087 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1088
1089 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1090 end
1091
1092 def fetch_activities_query(recipients, opts \\ %{}) do
1093 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1094 fetch_activities_query_ap_ids_ops(opts)
1095
1096 config = %{
1097 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1098 }
1099
1100 Activity
1101 |> maybe_preload_objects(opts)
1102 |> maybe_preload_bookmarks(opts)
1103 |> maybe_preload_report_notes(opts)
1104 |> maybe_set_thread_muted_field(opts)
1105 |> maybe_order(opts)
1106 |> restrict_recipients(recipients, opts[:user])
1107 |> restrict_replies(opts)
1108 |> restrict_tag(opts)
1109 |> restrict_tag_reject(opts)
1110 |> restrict_tag_all(opts)
1111 |> restrict_since(opts)
1112 |> restrict_local(opts)
1113 |> restrict_actor(opts)
1114 |> restrict_type(opts)
1115 |> restrict_state(opts)
1116 |> restrict_favorited_by(opts)
1117 |> restrict_blocked(restrict_blocked_opts)
1118 |> restrict_muted(restrict_muted_opts)
1119 |> restrict_filtered(opts)
1120 |> restrict_media(opts)
1121 |> restrict_visibility(opts)
1122 |> restrict_thread_visibility(opts, config)
1123 |> restrict_reblogs(opts)
1124 |> restrict_pinned(opts)
1125 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1126 |> restrict_instance(opts)
1127 |> restrict_announce_object_actor(opts)
1128 |> restrict_filtered(opts)
1129 |> Activity.restrict_deactivated_users()
1130 |> exclude_poll_votes(opts)
1131 |> exclude_chat_messages(opts)
1132 |> exclude_invisible_actors(opts)
1133 |> exclude_visibility(opts)
1134 end
1135
1136 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1137 list_memberships = Pleroma.List.memberships(opts[:user])
1138
1139 fetch_activities_query(recipients ++ list_memberships, opts)
1140 |> Pagination.fetch_paginated(opts, pagination)
1141 |> Enum.reverse()
1142 |> maybe_update_cc(list_memberships, opts[:user])
1143 end
1144
1145 @doc """
1146 Fetch favorites activities of user with order by sort adds to favorites
1147 """
1148 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1149 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1150 user.ap_id
1151 |> Activity.Queries.by_actor()
1152 |> Activity.Queries.by_type("Like")
1153 |> Activity.with_joined_object()
1154 |> Object.with_joined_activity()
1155 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1156 |> order_by([like, _, _], desc_nulls_last: like.id)
1157 |> Pagination.fetch_paginated(
1158 Map.merge(params, %{skip_order: true}),
1159 pagination
1160 )
1161 end
1162
1163 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1164 Enum.map(activities, fn
1165 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1166 if Enum.any?(bcc, &(&1 in list_memberships)) do
1167 update_in(activity.data["cc"], &[user_ap_id | &1])
1168 else
1169 activity
1170 end
1171
1172 activity ->
1173 activity
1174 end)
1175 end
1176
1177 defp maybe_update_cc(activities, _, _), do: activities
1178
1179 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1180 from(activity in query,
1181 where:
1182 fragment("? && ?", activity.recipients, ^recipients) or
1183 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1184 ^Constants.as_public() in activity.recipients)
1185 )
1186 end
1187
1188 def fetch_activities_bounded(
1189 recipients,
1190 recipients_with_public,
1191 opts \\ %{},
1192 pagination \\ :keyset
1193 ) do
1194 fetch_activities_query([], opts)
1195 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1196 |> Pagination.fetch_paginated(opts, pagination)
1197 |> Enum.reverse()
1198 end
1199
1200 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1201 def upload(file, opts \\ []) do
1202 with {:ok, data} <- Upload.store(file, opts) do
1203 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1204
1205 Repo.insert(%Object{data: obj_data})
1206 end
1207 end
1208
1209 @spec get_actor_url(any()) :: binary() | nil
1210 defp get_actor_url(url) when is_binary(url), do: url
1211 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1212
1213 defp get_actor_url(url) when is_list(url) do
1214 url
1215 |> List.first()
1216 |> get_actor_url()
1217 end
1218
1219 defp get_actor_url(_url), do: nil
1220
1221 defp object_to_user_data(data) do
1222 avatar =
1223 data["icon"]["url"] &&
1224 %{
1225 "type" => "Image",
1226 "url" => [%{"href" => data["icon"]["url"]}]
1227 }
1228
1229 banner =
1230 data["image"]["url"] &&
1231 %{
1232 "type" => "Image",
1233 "url" => [%{"href" => data["image"]["url"]}]
1234 }
1235
1236 fields =
1237 data
1238 |> Map.get("attachment", [])
1239 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1240 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1241
1242 emojis =
1243 data
1244 |> Map.get("tag", [])
1245 |> Enum.filter(fn
1246 %{"type" => "Emoji"} -> true
1247 _ -> false
1248 end)
1249 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1250 {String.trim(name, ":"), url}
1251 end)
1252
1253 is_locked = data["manuallyApprovesFollowers"] || false
1254 capabilities = data["capabilities"] || %{}
1255 accepts_chat_messages = capabilities["acceptsChatMessages"]
1256 data = Transmogrifier.maybe_fix_user_object(data)
1257 is_discoverable = data["discoverable"] || false
1258 invisible = data["invisible"] || false
1259 actor_type = data["type"] || "Person"
1260
1261 public_key =
1262 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1263 data["publicKey"]["publicKeyPem"]
1264 else
1265 nil
1266 end
1267
1268 shared_inbox =
1269 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1270 data["endpoints"]["sharedInbox"]
1271 else
1272 nil
1273 end
1274
1275 user_data = %{
1276 ap_id: data["id"],
1277 uri: get_actor_url(data["url"]),
1278 ap_enabled: true,
1279 banner: banner,
1280 fields: fields,
1281 emoji: emojis,
1282 is_locked: is_locked,
1283 is_discoverable: is_discoverable,
1284 invisible: invisible,
1285 avatar: avatar,
1286 name: data["name"],
1287 follower_address: data["followers"],
1288 following_address: data["following"],
1289 bio: data["summary"] || "",
1290 actor_type: actor_type,
1291 also_known_as: Map.get(data, "alsoKnownAs", []),
1292 public_key: public_key,
1293 inbox: data["inbox"],
1294 shared_inbox: shared_inbox,
1295 accepts_chat_messages: accepts_chat_messages
1296 }
1297
1298 # nickname can be nil because of virtual actors
1299 if data["preferredUsername"] do
1300 Map.put(
1301 user_data,
1302 :nickname,
1303 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1304 )
1305 else
1306 Map.put(user_data, :nickname, nil)
1307 end
1308 end
1309
1310 def fetch_follow_information_for_user(user) do
1311 with {:ok, following_data} <-
1312 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1313 {:ok, hide_follows} <- collection_private(following_data),
1314 {:ok, followers_data} <-
1315 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1316 {:ok, hide_followers} <- collection_private(followers_data) do
1317 {:ok,
1318 %{
1319 hide_follows: hide_follows,
1320 follower_count: normalize_counter(followers_data["totalItems"]),
1321 following_count: normalize_counter(following_data["totalItems"]),
1322 hide_followers: hide_followers
1323 }}
1324 else
1325 {:error, _} = e -> e
1326 e -> {:error, e}
1327 end
1328 end
1329
1330 defp normalize_counter(counter) when is_integer(counter), do: counter
1331 defp normalize_counter(_), do: 0
1332
1333 def maybe_update_follow_information(user_data) do
1334 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1335 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1336 {_, true} <-
1337 {:collections_available,
1338 !!(user_data[:following_address] && user_data[:follower_address])},
1339 {:ok, info} <-
1340 fetch_follow_information_for_user(user_data) do
1341 info = Map.merge(user_data[:info] || %{}, info)
1342
1343 user_data
1344 |> Map.put(:info, info)
1345 else
1346 {:user_type_check, false} ->
1347 user_data
1348
1349 {:collections_available, false} ->
1350 user_data
1351
1352 {:enabled, false} ->
1353 user_data
1354
1355 e ->
1356 Logger.error(
1357 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1358 )
1359
1360 user_data
1361 end
1362 end
1363
1364 defp collection_private(%{"first" => %{"type" => type}})
1365 when type in ["CollectionPage", "OrderedCollectionPage"],
1366 do: {:ok, false}
1367
1368 defp collection_private(%{"first" => first}) do
1369 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1370 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1371 {:ok, false}
1372 else
1373 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1374 {:error, _} = e -> e
1375 e -> {:error, e}
1376 end
1377 end
1378
1379 defp collection_private(_data), do: {:ok, true}
1380
1381 def user_data_from_user_object(data) do
1382 with {:ok, data} <- MRF.filter(data) do
1383 {:ok, object_to_user_data(data)}
1384 else
1385 e -> {:error, e}
1386 end
1387 end
1388
1389 def fetch_and_prepare_user_from_ap_id(ap_id) do
1390 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1391 {:ok, data} <- user_data_from_user_object(data) do
1392 {:ok, maybe_update_follow_information(data)}
1393 else
1394 # If this has been deleted, only log a debug and not an error
1395 {:error, "Object has been deleted" = e} ->
1396 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1397 {:error, e}
1398
1399 {:error, {:reject, reason} = e} ->
1400 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1401 {:error, e}
1402
1403 {:error, e} ->
1404 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1405 {:error, e}
1406 end
1407 end
1408
1409 def maybe_handle_clashing_nickname(data) do
1410 with nickname when is_binary(nickname) <- data[:nickname],
1411 %User{} = old_user <- User.get_by_nickname(nickname),
1412 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1413 Logger.info(
1414 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1415 data[:ap_id]
1416 }, renaming."
1417 )
1418
1419 old_user
1420 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1421 |> User.update_and_set_cache()
1422 else
1423 {:ap_id_comparison, true} ->
1424 Logger.info(
1425 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1426 )
1427
1428 _ ->
1429 nil
1430 end
1431 end
1432
1433 def make_user_from_ap_id(ap_id) do
1434 user = User.get_cached_by_ap_id(ap_id)
1435
1436 if user && !User.ap_enabled?(user) do
1437 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1438 else
1439 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1440 if user do
1441 user
1442 |> User.remote_user_changeset(data)
1443 |> User.update_and_set_cache()
1444 else
1445 maybe_handle_clashing_nickname(data)
1446
1447 data
1448 |> User.remote_user_changeset()
1449 |> Repo.insert()
1450 |> User.set_cache()
1451 end
1452 end
1453 end
1454 end
1455
1456 def make_user_from_nickname(nickname) do
1457 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1458 make_user_from_ap_id(ap_id)
1459 else
1460 _e -> {:error, "No AP id in WebFinger"}
1461 end
1462 end
1463
1464 # filter out broken threads
1465 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1466 entire_thread_visible_for_user?(activity, user)
1467 end
1468
1469 # do post-processing on a specific activity
1470 def contain_activity(%Activity{} = activity, %User{} = user) do
1471 contain_broken_threads(activity, user)
1472 end
1473
1474 def fetch_direct_messages_query do
1475 Activity
1476 |> restrict_type(%{type: "Create"})
1477 |> restrict_visibility(%{visibility: "direct"})
1478 |> order_by([activity], asc: activity.id)
1479 end
1480 end