9db72d6b2bb29e18e219bc2ba03ef6db509f6b29
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
43 {recipients, to, cc}
44 end
45
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
51 {recipients, to, cc}
52 end
53
54 defp check_actor_is_active(nil), do: true
55
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
59 _ -> false
60 end
61 end
62
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
66 end
67
68 defp check_remote_limit(_), do: true
69
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 end
73
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 end
77
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 "type" => "Create"
81 }) do
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
84 end
85 end
86
87 defp increase_replies_count_if_reply(_create_data), do: :noop
88
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
90 @impl true
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
93 {:ok, object, meta}
94 end
95 end
96
97 @impl true
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
101 {:ok, activity} <-
102 Repo.insert(%Activity{
103 data: object,
104 local: local,
105 recipients: recipients,
106 actor: object["actor"]
107 }),
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
111 end
112 end
113
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
128
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
131 end)
132
133 {:ok, activity}
134 else
135 %Activity{} = activity ->
136 {:ok, activity}
137
138 {:actor_check, _} ->
139 {:error, false}
140
141 {:containment, _} = error ->
142 error
143
144 {:error, _} = error ->
145 error
146
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
149 data: map,
150 local: local,
151 actor: map["actor"],
152 recipients: recipients,
153 id: "pleroma:fakeid"
154 }
155
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
157 {:ok, activity}
158
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
161
162 {:reject, _} = e ->
163 {:error, e}
164 end
165 end
166
167 defp insert_activity_with_expiration(data, local, recipients) do
168 struct = %Activity{
169 data: data,
170 local: local,
171 actor: data["actor"],
172 recipients: recipients
173 }
174
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
177 end
178 end
179
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
182
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
185 stream_out(activity)
186 stream_out_participations(participations)
187 end
188
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
191 ) do
192 with {:ok, _job} <-
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
196 }) do
197 {:ok, activity}
198 end
199 end
200
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
202
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
207 {:ok, conversation}
208 end
209 end
210
211 defp get_participations({:ok, conversation}) do
212 conversation
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
215 end
216
217 defp get_participations(_), do: []
218
219 def stream_out_participations(participations) do
220 participations =
221 participations
222 |> Repo.preload(:user)
223
224 Streamer.stream("participation", participations)
225 end
226
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
230
231 last_activity_id =
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
233 user: user,
234 blocking_user: user
235 })
236
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
239 end
240 end
241 end
242
243 def stream_out_participations(_, _), do: :noop
244
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
247 activity
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
250 end
251
252 def stream_out(_activity) do
253 :noop
254 end
255
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
259 result
260 end
261 end
262
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
269
270 create_data =
271 make_create_data(
272 %{to: to, actor: actor, published: published, context: context, object: object},
273 additional
274 )
275
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 else
285 {:quick_insert, true, activity} ->
286 {:ok, activity}
287
288 {:fake, true, activity} ->
289 {:ok, activity}
290
291 {:error, message} ->
292 Repo.rollback(message)
293 end
294 end
295
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
302
303 listen_data =
304 make_listen_data(
305 %{to: to, actor: actor, published: published, context: context, object: object},
306 additional
307 )
308
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 end
314 end
315
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
321 result
322 end
323 end
324
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
332 {:ok, activity}
333 else
334 nil -> nil
335 {:error, error} -> Repo.rollback(error)
336 end
337 end
338
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def flag(params) do
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
342 result
343 end
344 end
345
346 defp do_flag(
347 %{
348 actor: actor,
349 context: _context,
350 account: account,
351 statuses: statuses,
352 content: content
353 } = params
354 ) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
358
359 additional = params[:additional] || %{}
360
361 additional =
362 if forward do
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
364 else
365 Map.merge(additional, %{"to" => [], "cc" => []})
366 end
367
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
372 :ok <-
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
377 superuser
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
380 end)
381
382 {:ok, activity}
383 else
384 {:error, error} -> Repo.rollback(error)
385 end
386 end
387
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
390 params = %{
391 "type" => "Move",
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
395 }
396
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
401
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
405 })
406
407 {:ok, activity}
408 else
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
410 err -> err
411 end
412 end
413
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
416
417 recipients =
418 if opts[:user],
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
420 else: public
421
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
429 |> where(
430 [activity],
431 fragment(
432 "?->>'type' = ? and ?->>'context' = ?",
433 activity.data,
434 "Create",
435 activity.data,
436 ^context
437 )
438 )
439 |> exclude_poll_votes(opts)
440 |> exclude_id(opts)
441 |> order_by([activity], desc: activity.id)
442 end
443
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
446 context
447 |> fetch_activities_for_context_query(opts)
448 |> Repo.all()
449 end
450
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
457 |> limit(1)
458 |> select([a], a.id)
459 |> Repo.one()
460 end
461
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
465
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
470 end
471
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
474 opts
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
477 end
478
479 @valid_visibilities ~w[direct unlisted public private]
480
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
484 from(
485 a in query,
486 where:
487 fragment(
488 "activity_visibility(?, ?, ?) = ANY (?)",
489 a.actor,
490 a.recipients,
491 a.data,
492 ^visibility
493 )
494 )
495 else
496 Logger.error("Could not restrict visibility to #{visibility}")
497 end
498 end
499
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
502 from(
503 a in query,
504 where:
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 )
507 end
508
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
512 end
513
514 defp restrict_visibility(query, _visibility), do: query
515
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
519 from(
520 a in query,
521 where:
522 not fragment(
523 "activity_visibility(?, ?, ?) = ANY (?)",
524 a.actor,
525 a.recipients,
526 a.data,
527 ^visibility
528 )
529 )
530 else
531 Logger.error("Could not exclude visibility to #{visibility}")
532 query
533 end
534 end
535
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
538 from(
539 a in query,
540 where:
541 not fragment(
542 "activity_visibility(?, ?, ?) = ?",
543 a.actor,
544 a.recipients,
545 a.data,
546 ^visibility
547 )
548 )
549 end
550
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
554 query
555 end
556
557 defp exclude_visibility(query, _visibility), do: query
558
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
563 do: query
564
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
566 from(
567 a in query,
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 )
570 end
571
572 defp restrict_thread_visibility(query, _, _), do: query
573
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
575 params =
576 params
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579
580 %{
581 godmode: params[:godmode],
582 reading_user: reading_user
583 }
584 |> user_activities_recipients()
585 |> fetch_activities(params)
586 |> Enum.reverse()
587 end
588
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
590 params =
591 params
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
596
597 params =
598 if User.blocks?(reading_user, user) do
599 params
600 else
601 params
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
604 end
605
606 pagination_type = Map.get(params, :pagination_type) || :keyset
607
608 %{
609 godmode: params[:godmode],
610 reading_user: reading_user
611 }
612 |> user_activities_recipients()
613 |> fetch_activities(params, pagination_type)
614 |> Enum.reverse()
615 end
616
617 def fetch_statuses(reading_user, params) do
618 params = Map.put(params, :type, ["Create", "Announce"])
619
620 %{
621 godmode: params[:godmode],
622 reading_user: reading_user
623 }
624 |> user_activities_recipients()
625 |> fetch_activities(params, :offset)
626 |> Enum.reverse()
627 end
628
629 defp user_activities_recipients(%{godmode: true}), do: []
630
631 defp user_activities_recipients(%{reading_user: reading_user}) do
632 if reading_user do
633 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
634 else
635 [Constants.as_public()]
636 end
637 end
638
639 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
640 raise "Can't use the child object without preloading!"
641 end
642
643 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
644 from(
645 [activity, object] in query,
646 where:
647 fragment(
648 "?->>'type' != ? or ?->>'actor' != ?",
649 activity.data,
650 "Announce",
651 object.data,
652 ^actor
653 )
654 )
655 end
656
657 defp restrict_announce_object_actor(query, _), do: query
658
659 defp restrict_since(query, %{since_id: ""}), do: query
660
661 defp restrict_since(query, %{since_id: since_id}) do
662 from(activity in query, where: activity.id > ^since_id)
663 end
664
665 defp restrict_since(query, _), do: query
666
667 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
668 raise "Can't use the child object without preloading!"
669 end
670
671 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
672 from(
673 [_activity, object] in query,
674 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
675 )
676 end
677
678 defp restrict_tag_reject(query, _), do: query
679
680 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
681 raise "Can't use the child object without preloading!"
682 end
683
684 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
685 from(
686 [_activity, object] in query,
687 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
688 )
689 end
690
691 defp restrict_tag_all(query, _), do: query
692
693 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
694 raise "Can't use the child object without preloading!"
695 end
696
697 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
698 from(
699 [_activity, object] in query,
700 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
701 )
702 end
703
704 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
705 from(
706 [_activity, object] in query,
707 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
708 )
709 end
710
711 defp restrict_tag(query, _), do: query
712
713 defp restrict_recipients(query, [], _user), do: query
714
715 defp restrict_recipients(query, recipients, nil) do
716 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
717 end
718
719 defp restrict_recipients(query, recipients, user) do
720 from(
721 activity in query,
722 where: fragment("? && ?", ^recipients, activity.recipients),
723 or_where: activity.actor == ^user.ap_id
724 )
725 end
726
727 defp restrict_local(query, %{local_only: true}) do
728 from(activity in query, where: activity.local == true)
729 end
730
731 defp restrict_local(query, _), do: query
732
733 defp restrict_actor(query, %{actor_id: actor_id}) do
734 from(activity in query, where: activity.actor == ^actor_id)
735 end
736
737 defp restrict_actor(query, _), do: query
738
739 defp restrict_type(query, %{type: type}) when is_binary(type) do
740 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
741 end
742
743 defp restrict_type(query, %{type: type}) do
744 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
745 end
746
747 defp restrict_type(query, _), do: query
748
749 defp restrict_state(query, %{state: state}) do
750 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
751 end
752
753 defp restrict_state(query, _), do: query
754
755 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
756 from(
757 [_activity, object] in query,
758 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
759 )
760 end
761
762 defp restrict_favorited_by(query, _), do: query
763
764 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
765 raise "Can't use the child object without preloading!"
766 end
767
768 defp restrict_media(query, %{only_media: true}) do
769 from(
770 [activity, object] in query,
771 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
772 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
773 )
774 end
775
776 defp restrict_media(query, _), do: query
777
778 defp restrict_replies(query, %{exclude_replies: true}) do
779 from(
780 [_activity, object] in query,
781 where: fragment("?->>'inReplyTo' is null", object.data)
782 )
783 end
784
785 defp restrict_replies(query, %{
786 reply_filtering_user: %User{} = user,
787 reply_visibility: "self"
788 }) do
789 from(
790 [activity, object] in query,
791 where:
792 fragment(
793 "?->>'inReplyTo' is null OR ? = ANY(?)",
794 object.data,
795 ^user.ap_id,
796 activity.recipients
797 )
798 )
799 end
800
801 defp restrict_replies(query, %{
802 reply_filtering_user: %User{} = user,
803 reply_visibility: "following"
804 }) do
805 from(
806 [activity, object] in query,
807 where:
808 fragment(
809 """
810 ?->>'type' != 'Create' -- This isn't a Create
811 OR ?->>'inReplyTo' is null -- this isn't a reply
812 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
813 -- unless they are the author (because authors
814 -- are also part of the recipients). This leads
815 -- to a bug that self-replies by friends won't
816 -- show up.
817 OR ? = ? -- The actor is us
818 """,
819 activity.data,
820 object.data,
821 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
822 activity.recipients,
823 activity.actor,
824 activity.actor,
825 ^user.ap_id
826 )
827 )
828 end
829
830 defp restrict_replies(query, _), do: query
831
832 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
833 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
834 end
835
836 defp restrict_reblogs(query, _), do: query
837
838 defp restrict_muted(query, %{with_muted: true}), do: query
839
840 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
841 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
842
843 query =
844 from([activity] in query,
845 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
846 where:
847 fragment(
848 "not (?->'to' \\?| ?) or ? = ?",
849 activity.data,
850 ^mutes,
851 activity.actor,
852 ^user.ap_id
853 )
854 )
855
856 unless opts[:skip_preload] do
857 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
858 else
859 query
860 end
861 end
862
863 defp restrict_muted(query, _), do: query
864
865 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
866 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
867 domain_blocks = user.domain_blocks || []
868
869 following_ap_ids = User.get_friends_ap_ids(user)
870
871 query =
872 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
873
874 from(
875 [activity, object: o] in query,
876 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
877 where:
878 fragment(
879 "((not (? && ?)) or ? = ?)",
880 activity.recipients,
881 ^blocked_ap_ids,
882 activity.actor,
883 ^user.ap_id
884 ),
885 where:
886 fragment(
887 "recipients_contain_blocked_domains(?, ?) = false",
888 activity.recipients,
889 ^domain_blocks
890 ),
891 where:
892 fragment(
893 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
894 activity.data,
895 activity.data,
896 ^blocked_ap_ids
897 ),
898 where:
899 fragment(
900 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
901 activity.actor,
902 ^domain_blocks,
903 activity.actor,
904 ^following_ap_ids
905 ),
906 where:
907 fragment(
908 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
909 o.data,
910 ^domain_blocks,
911 o.data,
912 ^following_ap_ids
913 )
914 )
915 end
916
917 defp restrict_blocked(query, _), do: query
918
919 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
920 from(
921 activity in query,
922 where:
923 fragment(
924 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
925 activity.data,
926 ^[Constants.as_public()]
927 )
928 )
929 end
930
931 defp restrict_unlisted(query, _), do: query
932
933 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
934 from(activity in query, where: activity.id in ^ids)
935 end
936
937 defp restrict_pinned(query, _), do: query
938
939 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
940 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
941
942 from(
943 activity in query,
944 where:
945 fragment(
946 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
947 activity.data,
948 activity.actor,
949 ^muted_reblogs
950 )
951 )
952 end
953
954 defp restrict_muted_reblogs(query, _), do: query
955
956 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
957 from(
958 activity in query,
959 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
960 )
961 end
962
963 defp restrict_instance(query, _), do: query
964
965 defp restrict_filtered(query, %{user: %User{} = user}) do
966 case Filter.compose_regex(user) do
967 nil ->
968 query
969
970 regex ->
971 from([activity, object] in query,
972 where:
973 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
974 activity.actor == ^user.ap_id
975 )
976 end
977 end
978
979 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
980 restrict_filtered(query, %{user: user})
981 end
982
983 defp restrict_filtered(query, _), do: query
984
985 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
986
987 defp exclude_poll_votes(query, _) do
988 if has_named_binding?(query, :object) do
989 from([activity, object: o] in query,
990 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
991 )
992 else
993 query
994 end
995 end
996
997 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
998
999 defp exclude_chat_messages(query, _) do
1000 if has_named_binding?(query, :object) do
1001 from([activity, object: o] in query,
1002 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1003 )
1004 else
1005 query
1006 end
1007 end
1008
1009 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1010
1011 defp exclude_invisible_actors(query, _opts) do
1012 invisible_ap_ids =
1013 User.Query.build(%{invisible: true, select: [:ap_id]})
1014 |> Repo.all()
1015 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1016
1017 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1018 end
1019
1020 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1021 from(activity in query, where: activity.id != ^id)
1022 end
1023
1024 defp exclude_id(query, _), do: query
1025
1026 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1027
1028 defp maybe_preload_objects(query, _) do
1029 query
1030 |> Activity.with_preloaded_object()
1031 end
1032
1033 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1034
1035 defp maybe_preload_bookmarks(query, opts) do
1036 query
1037 |> Activity.with_preloaded_bookmark(opts[:user])
1038 end
1039
1040 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1041 query
1042 |> Activity.with_preloaded_report_notes()
1043 end
1044
1045 defp maybe_preload_report_notes(query, _), do: query
1046
1047 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1048
1049 defp maybe_set_thread_muted_field(query, opts) do
1050 query
1051 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1052 end
1053
1054 defp maybe_order(query, %{order: :desc}) do
1055 query
1056 |> order_by(desc: :id)
1057 end
1058
1059 defp maybe_order(query, %{order: :asc}) do
1060 query
1061 |> order_by(asc: :id)
1062 end
1063
1064 defp maybe_order(query, _), do: query
1065
1066 defp fetch_activities_query_ap_ids_ops(opts) do
1067 source_user = opts[:muting_user]
1068 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1069
1070 ap_id_relationships =
1071 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1072 [:block | ap_id_relationships]
1073 else
1074 ap_id_relationships
1075 end
1076
1077 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1078
1079 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1080 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1081
1082 restrict_muted_reblogs_opts =
1083 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1084
1085 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1086 end
1087
1088 def fetch_activities_query(recipients, opts \\ %{}) do
1089 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1090 fetch_activities_query_ap_ids_ops(opts)
1091
1092 config = %{
1093 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1094 }
1095
1096 Activity
1097 |> maybe_preload_objects(opts)
1098 |> maybe_preload_bookmarks(opts)
1099 |> maybe_preload_report_notes(opts)
1100 |> maybe_set_thread_muted_field(opts)
1101 |> maybe_order(opts)
1102 |> restrict_recipients(recipients, opts[:user])
1103 |> restrict_replies(opts)
1104 |> restrict_tag(opts)
1105 |> restrict_tag_reject(opts)
1106 |> restrict_tag_all(opts)
1107 |> restrict_since(opts)
1108 |> restrict_local(opts)
1109 |> restrict_actor(opts)
1110 |> restrict_type(opts)
1111 |> restrict_state(opts)
1112 |> restrict_favorited_by(opts)
1113 |> restrict_blocked(restrict_blocked_opts)
1114 |> restrict_muted(restrict_muted_opts)
1115 |> restrict_filtered(opts)
1116 |> restrict_media(opts)
1117 |> restrict_visibility(opts)
1118 |> restrict_thread_visibility(opts, config)
1119 |> restrict_reblogs(opts)
1120 |> restrict_pinned(opts)
1121 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1122 |> restrict_instance(opts)
1123 |> restrict_announce_object_actor(opts)
1124 |> restrict_filtered(opts)
1125 |> Activity.restrict_deactivated_users()
1126 |> exclude_poll_votes(opts)
1127 |> exclude_chat_messages(opts)
1128 |> exclude_invisible_actors(opts)
1129 |> exclude_visibility(opts)
1130 end
1131
1132 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1133 list_memberships = Pleroma.List.memberships(opts[:user])
1134
1135 fetch_activities_query(recipients ++ list_memberships, opts)
1136 |> Pagination.fetch_paginated(opts, pagination)
1137 |> Enum.reverse()
1138 |> maybe_update_cc(list_memberships, opts[:user])
1139 end
1140
1141 @doc """
1142 Fetch favorites activities of user with order by sort adds to favorites
1143 """
1144 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1145 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1146 user.ap_id
1147 |> Activity.Queries.by_actor()
1148 |> Activity.Queries.by_type("Like")
1149 |> Activity.with_joined_object()
1150 |> Object.with_joined_activity()
1151 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1152 |> order_by([like, _, _], desc_nulls_last: like.id)
1153 |> Pagination.fetch_paginated(
1154 Map.merge(params, %{skip_order: true}),
1155 pagination
1156 )
1157 end
1158
1159 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1160 Enum.map(activities, fn
1161 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1162 if Enum.any?(bcc, &(&1 in list_memberships)) do
1163 update_in(activity.data["cc"], &[user_ap_id | &1])
1164 else
1165 activity
1166 end
1167
1168 activity ->
1169 activity
1170 end)
1171 end
1172
1173 defp maybe_update_cc(activities, _, _), do: activities
1174
1175 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1176 from(activity in query,
1177 where:
1178 fragment("? && ?", activity.recipients, ^recipients) or
1179 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1180 ^Constants.as_public() in activity.recipients)
1181 )
1182 end
1183
1184 def fetch_activities_bounded(
1185 recipients,
1186 recipients_with_public,
1187 opts \\ %{},
1188 pagination \\ :keyset
1189 ) do
1190 fetch_activities_query([], opts)
1191 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1192 |> Pagination.fetch_paginated(opts, pagination)
1193 |> Enum.reverse()
1194 end
1195
1196 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1197 def upload(file, opts \\ []) do
1198 with {:ok, data} <- Upload.store(file, opts) do
1199 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1200
1201 Repo.insert(%Object{data: obj_data})
1202 end
1203 end
1204
1205 @spec get_actor_url(any()) :: binary() | nil
1206 defp get_actor_url(url) when is_binary(url), do: url
1207 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1208
1209 defp get_actor_url(url) when is_list(url) do
1210 url
1211 |> List.first()
1212 |> get_actor_url()
1213 end
1214
1215 defp get_actor_url(_url), do: nil
1216
1217 defp object_to_user_data(data) do
1218 avatar =
1219 data["icon"]["url"] &&
1220 %{
1221 "type" => "Image",
1222 "url" => [%{"href" => data["icon"]["url"]}]
1223 }
1224
1225 banner =
1226 data["image"]["url"] &&
1227 %{
1228 "type" => "Image",
1229 "url" => [%{"href" => data["image"]["url"]}]
1230 }
1231
1232 fields =
1233 data
1234 |> Map.get("attachment", [])
1235 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1236 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1237
1238 emojis =
1239 data
1240 |> Map.get("tag", [])
1241 |> Enum.filter(fn
1242 %{"type" => "Emoji"} -> true
1243 _ -> false
1244 end)
1245 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1246 {String.trim(name, ":"), url}
1247 end)
1248
1249 is_locked = data["manuallyApprovesFollowers"] || false
1250 capabilities = data["capabilities"] || %{}
1251 accepts_chat_messages = capabilities["acceptsChatMessages"]
1252 data = Transmogrifier.maybe_fix_user_object(data)
1253 is_discoverable = data["discoverable"] || false
1254 invisible = data["invisible"] || false
1255 actor_type = data["type"] || "Person"
1256
1257 public_key =
1258 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1259 data["publicKey"]["publicKeyPem"]
1260 else
1261 nil
1262 end
1263
1264 shared_inbox =
1265 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1266 data["endpoints"]["sharedInbox"]
1267 else
1268 nil
1269 end
1270
1271 user_data = %{
1272 ap_id: data["id"],
1273 uri: get_actor_url(data["url"]),
1274 ap_enabled: true,
1275 banner: banner,
1276 fields: fields,
1277 emoji: emojis,
1278 is_locked: is_locked,
1279 is_discoverable: is_discoverable,
1280 invisible: invisible,
1281 avatar: avatar,
1282 name: data["name"],
1283 follower_address: data["followers"],
1284 following_address: data["following"],
1285 bio: data["summary"] || "",
1286 actor_type: actor_type,
1287 also_known_as: Map.get(data, "alsoKnownAs", []),
1288 public_key: public_key,
1289 inbox: data["inbox"],
1290 shared_inbox: shared_inbox,
1291 accepts_chat_messages: accepts_chat_messages
1292 }
1293
1294 # nickname can be nil because of virtual actors
1295 if data["preferredUsername"] do
1296 Map.put(
1297 user_data,
1298 :nickname,
1299 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1300 )
1301 else
1302 Map.put(user_data, :nickname, nil)
1303 end
1304 end
1305
1306 def fetch_follow_information_for_user(user) do
1307 with {:ok, following_data} <-
1308 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1309 {:ok, hide_follows} <- collection_private(following_data),
1310 {:ok, followers_data} <-
1311 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1312 {:ok, hide_followers} <- collection_private(followers_data) do
1313 {:ok,
1314 %{
1315 hide_follows: hide_follows,
1316 follower_count: normalize_counter(followers_data["totalItems"]),
1317 following_count: normalize_counter(following_data["totalItems"]),
1318 hide_followers: hide_followers
1319 }}
1320 else
1321 {:error, _} = e -> e
1322 e -> {:error, e}
1323 end
1324 end
1325
1326 defp normalize_counter(counter) when is_integer(counter), do: counter
1327 defp normalize_counter(_), do: 0
1328
1329 def maybe_update_follow_information(user_data) do
1330 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1331 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1332 {_, true} <-
1333 {:collections_available,
1334 !!(user_data[:following_address] && user_data[:follower_address])},
1335 {:ok, info} <-
1336 fetch_follow_information_for_user(user_data) do
1337 info = Map.merge(user_data[:info] || %{}, info)
1338
1339 user_data
1340 |> Map.put(:info, info)
1341 else
1342 {:user_type_check, false} ->
1343 user_data
1344
1345 {:collections_available, false} ->
1346 user_data
1347
1348 {:enabled, false} ->
1349 user_data
1350
1351 e ->
1352 Logger.error(
1353 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1354 )
1355
1356 user_data
1357 end
1358 end
1359
1360 defp collection_private(%{"first" => %{"type" => type}})
1361 when type in ["CollectionPage", "OrderedCollectionPage"],
1362 do: {:ok, false}
1363
1364 defp collection_private(%{"first" => first}) do
1365 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1366 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1367 {:ok, false}
1368 else
1369 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1370 {:error, _} = e -> e
1371 e -> {:error, e}
1372 end
1373 end
1374
1375 defp collection_private(_data), do: {:ok, true}
1376
1377 def user_data_from_user_object(data) do
1378 with {:ok, data} <- MRF.filter(data) do
1379 {:ok, object_to_user_data(data)}
1380 else
1381 e -> {:error, e}
1382 end
1383 end
1384
1385 def fetch_and_prepare_user_from_ap_id(ap_id) do
1386 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1387 {:ok, data} <- user_data_from_user_object(data) do
1388 {:ok, maybe_update_follow_information(data)}
1389 else
1390 # If this has been deleted, only log a debug and not an error
1391 {:error, "Object has been deleted" = e} ->
1392 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1393 {:error, e}
1394
1395 {:error, {:reject, reason} = e} ->
1396 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1397 {:error, e}
1398
1399 {:error, e} ->
1400 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1401 {:error, e}
1402 end
1403 end
1404
1405 def maybe_handle_clashing_nickname(data) do
1406 with nickname when is_binary(nickname) <- data[:nickname],
1407 %User{} = old_user <- User.get_by_nickname(nickname),
1408 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1409 Logger.info(
1410 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1411 data[:ap_id]
1412 }, renaming."
1413 )
1414
1415 old_user
1416 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1417 |> User.update_and_set_cache()
1418 else
1419 {:ap_id_comparison, true} ->
1420 Logger.info(
1421 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1422 )
1423
1424 _ ->
1425 nil
1426 end
1427 end
1428
1429 def make_user_from_ap_id(ap_id) do
1430 user = User.get_cached_by_ap_id(ap_id)
1431
1432 if user && !User.ap_enabled?(user) do
1433 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1434 else
1435 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1436 if user do
1437 user
1438 |> User.remote_user_changeset(data)
1439 |> User.update_and_set_cache()
1440 else
1441 maybe_handle_clashing_nickname(data)
1442
1443 data
1444 |> User.remote_user_changeset()
1445 |> Repo.insert()
1446 |> User.set_cache()
1447 end
1448 end
1449 end
1450 end
1451
1452 def make_user_from_nickname(nickname) do
1453 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1454 make_user_from_ap_id(ap_id)
1455 else
1456 _e -> {:error, "No AP id in WebFinger"}
1457 end
1458 end
1459
1460 # filter out broken threads
1461 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1462 entire_thread_visible_for_user?(activity, user)
1463 end
1464
1465 # do post-processing on a specific activity
1466 def contain_activity(%Activity{} = activity, %User{} = user) do
1467 contain_broken_threads(activity, user)
1468 end
1469
1470 def fetch_direct_messages_query do
1471 Activity
1472 |> restrict_type(%{type: "Create"})
1473 |> restrict_visibility(%{visibility: "direct"})
1474 |> order_by([activity], asc: activity.id)
1475 end
1476 end