a5f8ba40a0efab5366c16a3ffd482e3bb57acbc8
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 # For Announce activities, we filter the recipients based on following status for any actors
36 # that match actual users. See issue #164 for more information about why this is necessary.
37 defp get_recipients(%{"type" => "Announce"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = User.get_cached_by_ap_id(data["actor"])
42
43 recipients =
44 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
45 case User.get_cached_by_ap_id(recipient) do
46 nil -> true
47 user -> User.following?(user, actor)
48 end
49 end)
50
51 {recipients, to, cc}
52 end
53
54 defp get_recipients(%{"type" => "Create"} = data) do
55 to = Map.get(data, "to", [])
56 cc = Map.get(data, "cc", [])
57 bcc = Map.get(data, "bcc", [])
58 actor = Map.get(data, "actor", [])
59 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
60 {recipients, to, cc}
61 end
62
63 defp get_recipients(data) do
64 to = Map.get(data, "to", [])
65 cc = Map.get(data, "cc", [])
66 bcc = Map.get(data, "bcc", [])
67 recipients = Enum.concat([to, cc, bcc])
68 {recipients, to, cc}
69 end
70
71 defp check_actor_is_active(nil), do: true
72
73 defp check_actor_is_active(actor) when is_binary(actor) do
74 case User.get_cached_by_ap_id(actor) do
75 %User{deactivated: deactivated} -> not deactivated
76 _ -> false
77 end
78 end
79
80 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
81 limit = Config.get([:instance, :remote_limit])
82 String.length(content) <= limit
83 end
84
85 defp check_remote_limit(_), do: true
86
87 defp increase_note_count_if_public(actor, object) do
88 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
89 end
90
91 def decrease_note_count_if_public(actor, object) do
92 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
93 end
94
95 defp increase_replies_count_if_reply(%{
96 "object" => %{"inReplyTo" => reply_ap_id} = object,
97 "type" => "Create"
98 }) do
99 if is_public?(object) do
100 Object.increase_replies_count(reply_ap_id)
101 end
102 end
103
104 defp increase_replies_count_if_reply(_create_data), do: :noop
105
106 defp increase_poll_votes_if_vote(%{
107 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
108 "type" => "Create",
109 "actor" => actor
110 }) do
111 Object.increase_vote_count(reply_ap_id, name, actor)
112 end
113
114 defp increase_poll_votes_if_vote(_create_data), do: :noop
115
116 @object_types ["ChatMessage"]
117 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
118 def persist(%{"type" => type} = object, meta) when type in @object_types do
119 with {:ok, object} <- Object.create(object) do
120 {:ok, object, meta}
121 end
122 end
123
124 def persist(object, meta) do
125 with local <- Keyword.fetch!(meta, :local),
126 {recipients, _, _} <- get_recipients(object),
127 {:ok, activity} <-
128 Repo.insert(%Activity{
129 data: object,
130 local: local,
131 recipients: recipients,
132 actor: object["actor"]
133 }) do
134 {:ok, activity, meta}
135 end
136 end
137
138 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
139 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
140 with nil <- Activity.normalize(map),
141 map <- lazy_put_activity_defaults(map, fake),
142 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
143 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
144 {:ok, map} <- MRF.filter(map),
145 {recipients, _, _} = get_recipients(map),
146 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
147 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
148 {:ok, map, object} <- insert_full_object(map) do
149 {:ok, activity} =
150 %Activity{
151 data: map,
152 local: local,
153 actor: map["actor"],
154 recipients: recipients
155 }
156 |> Repo.insert()
157 |> maybe_create_activity_expiration()
158
159 # Splice in the child object if we have one.
160 activity = Maps.put_if_present(activity, :object, object)
161
162 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
163
164 {:ok, activity}
165 else
166 %Activity{} = activity ->
167 {:ok, activity}
168
169 {:fake, true, map, recipients} ->
170 activity = %Activity{
171 data: map,
172 local: local,
173 actor: map["actor"],
174 recipients: recipients,
175 id: "pleroma:fakeid"
176 }
177
178 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
179 {:ok, activity}
180
181 error ->
182 {:error, error}
183 end
184 end
185
186 def notify_and_stream(activity) do
187 Notification.create_notifications(activity)
188
189 conversation = create_or_bump_conversation(activity, activity.actor)
190 participations = get_participations(conversation)
191 stream_out(activity)
192 stream_out_participations(participations)
193 end
194
195 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
196 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
197 {:ok, activity}
198 end
199 end
200
201 defp maybe_create_activity_expiration(result), do: result
202
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
207 {:ok, conversation}
208 end
209 end
210
211 defp get_participations({:ok, conversation}) do
212 conversation
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
215 end
216
217 defp get_participations(_), do: []
218
219 def stream_out_participations(participations) do
220 participations =
221 participations
222 |> Repo.preload(:user)
223
224 Streamer.stream("participation", participations)
225 end
226
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
230
231 last_activity_id =
232 fetch_latest_activity_id_for_context(conversation.ap_id, %{
233 user: user,
234 blocking_user: user
235 })
236
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
239 end
240 end
241 end
242
243 def stream_out_participations(_, _), do: :noop
244
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
247 activity
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
250 end
251
252 def stream_out(_activity) do
253 :noop
254 end
255
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
259 result
260 end
261 end
262
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
269
270 create_data =
271 make_create_data(
272 %{to: to, actor: actor, published: published, context: context, object: object},
273 additional
274 )
275
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 _ <- increase_poll_votes_if_vote(create_data),
280 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
281 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
282 _ <- notify_and_stream(activity),
283 :ok <- maybe_federate(activity) do
284 {:ok, activity}
285 else
286 {:quick_insert, true, activity} ->
287 {:ok, activity}
288
289 {:fake, true, activity} ->
290 {:ok, activity}
291
292 {:error, message} ->
293 Repo.rollback(message)
294 end
295 end
296
297 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
298 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
299 additional = params[:additional] || %{}
300 # only accept false as false value
301 local = !(params[:local] == false)
302 published = params[:published]
303
304 listen_data =
305 make_listen_data(
306 %{to: to, actor: actor, published: published, context: context, object: object},
307 additional
308 )
309
310 with {:ok, activity} <- insert(listen_data, local),
311 _ <- notify_and_stream(activity),
312 :ok <- maybe_federate(activity) do
313 {:ok, activity}
314 end
315 end
316
317 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
318 def accept(params) do
319 accept_or_reject("Accept", params)
320 end
321
322 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
323 def reject(params) do
324 accept_or_reject("Reject", params)
325 end
326
327 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
328 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
329 local = Map.get(params, :local, true)
330 activity_id = Map.get(params, :activity_id, nil)
331
332 data =
333 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
334 |> Maps.put_if_present("id", activity_id)
335
336 with {:ok, activity} <- insert(data, local),
337 _ <- notify_and_stream(activity),
338 :ok <- maybe_federate(activity) do
339 {:ok, activity}
340 end
341 end
342
343 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
344 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
345 local = !(params[:local] == false)
346 activity_id = params[:activity_id]
347
348 data =
349 %{
350 "to" => to,
351 "cc" => cc,
352 "type" => "Update",
353 "actor" => actor,
354 "object" => object
355 }
356 |> Maps.put_if_present("id", activity_id)
357
358 with {:ok, activity} <- insert(data, local),
359 _ <- notify_and_stream(activity),
360 :ok <- maybe_federate(activity) do
361 {:ok, activity}
362 end
363 end
364
365 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
366 {:ok, Activity.t()} | {:error, any()}
367 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
368 with {:ok, result} <-
369 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
370 result
371 end
372 end
373
374 defp do_follow(follower, followed, activity_id, local, opts) do
375 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
376 data = make_follow_data(follower, followed, activity_id)
377
378 with {:ok, activity} <- insert(data, local),
379 _ <- skip_notify_and_stream || notify_and_stream(activity),
380 :ok <- maybe_federate(activity) do
381 {:ok, activity}
382 else
383 {:error, error} -> Repo.rollback(error)
384 end
385 end
386
387 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
388 {:ok, Activity.t()} | nil | {:error, any()}
389 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
390 with {:ok, result} <-
391 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
392 result
393 end
394 end
395
396 defp do_unfollow(follower, followed, activity_id, local) do
397 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
398 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
399 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
400 {:ok, activity} <- insert(unfollow_data, local),
401 _ <- notify_and_stream(activity),
402 :ok <- maybe_federate(activity) do
403 {:ok, activity}
404 else
405 nil -> nil
406 {:error, error} -> Repo.rollback(error)
407 end
408 end
409
410 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
411 {:ok, Activity.t()} | {:error, any()}
412 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
413 with {:ok, result} <-
414 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
415 result
416 end
417 end
418
419 defp do_block(blocker, blocked, activity_id, local) do
420 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
421
422 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
423 unfollow(blocker, blocked, nil, local)
424 end
425
426 block_data = make_block_data(blocker, blocked, activity_id)
427
428 with {:ok, activity} <- insert(block_data, local),
429 _ <- notify_and_stream(activity),
430 :ok <- maybe_federate(activity) do
431 {:ok, activity}
432 else
433 {:error, error} -> Repo.rollback(error)
434 end
435 end
436
437 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
438 def flag(
439 %{
440 actor: actor,
441 context: _context,
442 account: account,
443 statuses: statuses,
444 content: content
445 } = params
446 ) do
447 # only accept false as false value
448 local = !(params[:local] == false)
449 forward = !(params[:forward] == false)
450
451 additional = params[:additional] || %{}
452
453 additional =
454 if forward do
455 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
456 else
457 Map.merge(additional, %{"to" => [], "cc" => []})
458 end
459
460 with flag_data <- make_flag_data(params, additional),
461 {:ok, activity} <- insert(flag_data, local),
462 {:ok, stripped_activity} <- strip_report_status_data(activity),
463 _ <- notify_and_stream(activity),
464 :ok <- maybe_federate(stripped_activity) do
465 User.all_superusers()
466 |> Enum.filter(fn user -> not is_nil(user.email) end)
467 |> Enum.each(fn superuser ->
468 superuser
469 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
470 |> Pleroma.Emails.Mailer.deliver_async()
471 end)
472
473 {:ok, activity}
474 end
475 end
476
477 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
478 def move(%User{} = origin, %User{} = target, local \\ true) do
479 params = %{
480 "type" => "Move",
481 "actor" => origin.ap_id,
482 "object" => origin.ap_id,
483 "target" => target.ap_id
484 }
485
486 with true <- origin.ap_id in target.also_known_as,
487 {:ok, activity} <- insert(params, local),
488 _ <- notify_and_stream(activity) do
489 maybe_federate(activity)
490
491 BackgroundWorker.enqueue("move_following", %{
492 "origin_id" => origin.id,
493 "target_id" => target.id
494 })
495
496 {:ok, activity}
497 else
498 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
499 err -> err
500 end
501 end
502
503 def fetch_activities_for_context_query(context, opts) do
504 public = [Constants.as_public()]
505
506 recipients =
507 if opts[:user],
508 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
509 else: public
510
511 from(activity in Activity)
512 |> maybe_preload_objects(opts)
513 |> maybe_preload_bookmarks(opts)
514 |> maybe_set_thread_muted_field(opts)
515 |> restrict_blocked(opts)
516 |> restrict_recipients(recipients, opts[:user])
517 |> where(
518 [activity],
519 fragment(
520 "?->>'type' = ? and ?->>'context' = ?",
521 activity.data,
522 "Create",
523 activity.data,
524 ^context
525 )
526 )
527 |> exclude_poll_votes(opts)
528 |> exclude_id(opts)
529 |> order_by([activity], desc: activity.id)
530 end
531
532 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
533 def fetch_activities_for_context(context, opts \\ %{}) do
534 context
535 |> fetch_activities_for_context_query(opts)
536 |> Repo.all()
537 end
538
539 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
540 FlakeId.Ecto.CompatType.t() | nil
541 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
542 context
543 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
544 |> limit(1)
545 |> select([a], a.id)
546 |> Repo.one()
547 end
548
549 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
550 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
551 opts = Map.delete(opts, :user)
552
553 [Constants.as_public()]
554 |> fetch_activities_query(opts)
555 |> restrict_unlisted(opts)
556 |> Pagination.fetch_paginated(opts, pagination)
557 end
558
559 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
560 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
561 opts
562 |> Map.put(:restrict_unlisted, true)
563 |> fetch_public_or_unlisted_activities(pagination)
564 end
565
566 @valid_visibilities ~w[direct unlisted public private]
567
568 defp restrict_visibility(query, %{visibility: visibility})
569 when is_list(visibility) do
570 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
571 from(
572 a in query,
573 where:
574 fragment(
575 "activity_visibility(?, ?, ?) = ANY (?)",
576 a.actor,
577 a.recipients,
578 a.data,
579 ^visibility
580 )
581 )
582 else
583 Logger.error("Could not restrict visibility to #{visibility}")
584 end
585 end
586
587 defp restrict_visibility(query, %{visibility: visibility})
588 when visibility in @valid_visibilities do
589 from(
590 a in query,
591 where:
592 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
593 )
594 end
595
596 defp restrict_visibility(_query, %{visibility: visibility})
597 when visibility not in @valid_visibilities do
598 Logger.error("Could not restrict visibility to #{visibility}")
599 end
600
601 defp restrict_visibility(query, _visibility), do: query
602
603 defp exclude_visibility(query, %{exclude_visibilities: visibility})
604 when is_list(visibility) do
605 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
606 from(
607 a in query,
608 where:
609 not fragment(
610 "activity_visibility(?, ?, ?) = ANY (?)",
611 a.actor,
612 a.recipients,
613 a.data,
614 ^visibility
615 )
616 )
617 else
618 Logger.error("Could not exclude visibility to #{visibility}")
619 query
620 end
621 end
622
623 defp exclude_visibility(query, %{exclude_visibilities: visibility})
624 when visibility in @valid_visibilities do
625 from(
626 a in query,
627 where:
628 not fragment(
629 "activity_visibility(?, ?, ?) = ?",
630 a.actor,
631 a.recipients,
632 a.data,
633 ^visibility
634 )
635 )
636 end
637
638 defp exclude_visibility(query, %{exclude_visibilities: visibility})
639 when visibility not in [nil | @valid_visibilities] do
640 Logger.error("Could not exclude visibility to #{visibility}")
641 query
642 end
643
644 defp exclude_visibility(query, _visibility), do: query
645
646 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
647 do: query
648
649 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
650 do: query
651
652 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
653 from(
654 a in query,
655 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
656 )
657 end
658
659 defp restrict_thread_visibility(query, _, _), do: query
660
661 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
662 params =
663 params
664 |> Map.put(:user, reading_user)
665 |> Map.put(:actor_id, user.ap_id)
666
667 %{
668 godmode: params[:godmode],
669 reading_user: reading_user
670 }
671 |> user_activities_recipients()
672 |> fetch_activities(params)
673 |> Enum.reverse()
674 end
675
676 def fetch_user_activities(user, reading_user, params \\ %{}) do
677 params =
678 params
679 |> Map.put(:type, ["Create", "Announce"])
680 |> Map.put(:user, reading_user)
681 |> Map.put(:actor_id, user.ap_id)
682 |> Map.put(:pinned_activity_ids, user.pinned_activities)
683
684 params =
685 if User.blocks?(reading_user, user) do
686 params
687 else
688 params
689 |> Map.put(:blocking_user, reading_user)
690 |> Map.put(:muting_user, reading_user)
691 end
692
693 %{
694 godmode: params[:godmode],
695 reading_user: reading_user
696 }
697 |> user_activities_recipients()
698 |> fetch_activities(params)
699 |> Enum.reverse()
700 end
701
702 def fetch_statuses(reading_user, params) do
703 params = Map.put(params, :type, ["Create", "Announce"])
704
705 %{
706 godmode: params[:godmode],
707 reading_user: reading_user
708 }
709 |> user_activities_recipients()
710 |> fetch_activities(params, :offset)
711 |> Enum.reverse()
712 end
713
714 defp user_activities_recipients(%{godmode: true}), do: []
715
716 defp user_activities_recipients(%{reading_user: reading_user}) do
717 if reading_user do
718 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
719 else
720 [Constants.as_public()]
721 end
722 end
723
724 defp restrict_since(query, %{since_id: ""}), do: query
725
726 defp restrict_since(query, %{since_id: since_id}) do
727 from(activity in query, where: activity.id > ^since_id)
728 end
729
730 defp restrict_since(query, _), do: query
731
732 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
733 raise "Can't use the child object without preloading!"
734 end
735
736 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
737 from(
738 [_activity, object] in query,
739 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
740 )
741 end
742
743 defp restrict_tag_reject(query, _), do: query
744
745 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
746 raise "Can't use the child object without preloading!"
747 end
748
749 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
750 from(
751 [_activity, object] in query,
752 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
753 )
754 end
755
756 defp restrict_tag_all(query, _), do: query
757
758 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
759 raise "Can't use the child object without preloading!"
760 end
761
762 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
763 from(
764 [_activity, object] in query,
765 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
766 )
767 end
768
769 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
770 from(
771 [_activity, object] in query,
772 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
773 )
774 end
775
776 defp restrict_tag(query, _), do: query
777
778 defp restrict_recipients(query, [], _user), do: query
779
780 defp restrict_recipients(query, recipients, nil) do
781 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
782 end
783
784 defp restrict_recipients(query, recipients, user) do
785 from(
786 activity in query,
787 where: fragment("? && ?", ^recipients, activity.recipients),
788 or_where: activity.actor == ^user.ap_id
789 )
790 end
791
792 defp restrict_local(query, %{local_only: true}) do
793 from(activity in query, where: activity.local == true)
794 end
795
796 defp restrict_local(query, _), do: query
797
798 defp restrict_actor(query, %{actor_id: actor_id}) do
799 from(activity in query, where: activity.actor == ^actor_id)
800 end
801
802 defp restrict_actor(query, _), do: query
803
804 defp restrict_type(query, %{type: type}) when is_binary(type) do
805 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
806 end
807
808 defp restrict_type(query, %{type: type}) do
809 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
810 end
811
812 defp restrict_type(query, _), do: query
813
814 defp restrict_state(query, %{state: state}) do
815 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
816 end
817
818 defp restrict_state(query, _), do: query
819
820 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
821 from(
822 [_activity, object] in query,
823 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
824 )
825 end
826
827 defp restrict_favorited_by(query, _), do: query
828
829 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
830 raise "Can't use the child object without preloading!"
831 end
832
833 defp restrict_media(query, %{only_media: true}) do
834 from(
835 [_activity, object] in query,
836 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
837 )
838 end
839
840 defp restrict_media(query, _), do: query
841
842 defp restrict_replies(query, %{exclude_replies: true}) do
843 from(
844 [_activity, object] in query,
845 where: fragment("?->>'inReplyTo' is null", object.data)
846 )
847 end
848
849 defp restrict_replies(query, %{
850 reply_filtering_user: user,
851 reply_visibility: "self"
852 }) do
853 from(
854 [activity, object] in query,
855 where:
856 fragment(
857 "?->>'inReplyTo' is null OR ? = ANY(?)",
858 object.data,
859 ^user.ap_id,
860 activity.recipients
861 )
862 )
863 end
864
865 defp restrict_replies(query, %{
866 reply_filtering_user: user,
867 reply_visibility: "following"
868 }) do
869 from(
870 [activity, object] in query,
871 where:
872 fragment(
873 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
874 object.data,
875 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
876 activity.recipients,
877 activity.actor,
878 activity.actor,
879 ^user.ap_id
880 )
881 )
882 end
883
884 defp restrict_replies(query, _), do: query
885
886 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
887 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
888 end
889
890 defp restrict_reblogs(query, _), do: query
891
892 defp restrict_muted(query, %{with_muted: true}), do: query
893
894 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
895 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
896
897 query =
898 from([activity] in query,
899 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
900 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
901 )
902
903 unless opts[:skip_preload] do
904 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
905 else
906 query
907 end
908 end
909
910 defp restrict_muted(query, _), do: query
911
912 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
913 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
914 domain_blocks = user.domain_blocks || []
915
916 following_ap_ids = User.get_friends_ap_ids(user)
917
918 query =
919 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
920
921 from(
922 [activity, object: o] in query,
923 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
924 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
925 where:
926 fragment(
927 "recipients_contain_blocked_domains(?, ?) = false",
928 activity.recipients,
929 ^domain_blocks
930 ),
931 where:
932 fragment(
933 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
934 activity.data,
935 activity.data,
936 ^blocked_ap_ids
937 ),
938 where:
939 fragment(
940 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
941 activity.actor,
942 ^domain_blocks,
943 activity.actor,
944 ^following_ap_ids
945 ),
946 where:
947 fragment(
948 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
949 o.data,
950 ^domain_blocks,
951 o.data,
952 ^following_ap_ids
953 )
954 )
955 end
956
957 defp restrict_blocked(query, _), do: query
958
959 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
960 from(
961 activity in query,
962 where:
963 fragment(
964 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
965 activity.data,
966 ^[Constants.as_public()]
967 )
968 )
969 end
970
971 defp restrict_unlisted(query, _), do: query
972
973 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
974 from(activity in query, where: activity.id in ^ids)
975 end
976
977 defp restrict_pinned(query, _), do: query
978
979 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
980 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
981
982 from(
983 activity in query,
984 where:
985 fragment(
986 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
987 activity.data,
988 activity.actor,
989 ^muted_reblogs
990 )
991 )
992 end
993
994 defp restrict_muted_reblogs(query, _), do: query
995
996 defp restrict_instance(query, %{instance: instance}) do
997 users =
998 from(
999 u in User,
1000 select: u.ap_id,
1001 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1002 )
1003 |> Repo.all()
1004
1005 from(activity in query, where: activity.actor in ^users)
1006 end
1007
1008 defp restrict_instance(query, _), do: query
1009
1010 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1011
1012 defp exclude_poll_votes(query, _) do
1013 if has_named_binding?(query, :object) do
1014 from([activity, object: o] in query,
1015 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1016 )
1017 else
1018 query
1019 end
1020 end
1021
1022 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1023
1024 defp exclude_chat_messages(query, _) do
1025 if has_named_binding?(query, :object) do
1026 from([activity, object: o] in query,
1027 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1028 )
1029 else
1030 query
1031 end
1032 end
1033
1034 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1035
1036 defp exclude_invisible_actors(query, _opts) do
1037 invisible_ap_ids =
1038 User.Query.build(%{invisible: true, select: [:ap_id]})
1039 |> Repo.all()
1040 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1041
1042 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1043 end
1044
1045 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1046 from(activity in query, where: activity.id != ^id)
1047 end
1048
1049 defp exclude_id(query, _), do: query
1050
1051 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1052
1053 defp maybe_preload_objects(query, _) do
1054 query
1055 |> Activity.with_preloaded_object()
1056 end
1057
1058 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1059
1060 defp maybe_preload_bookmarks(query, opts) do
1061 query
1062 |> Activity.with_preloaded_bookmark(opts[:user])
1063 end
1064
1065 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1066 query
1067 |> Activity.with_preloaded_report_notes()
1068 end
1069
1070 defp maybe_preload_report_notes(query, _), do: query
1071
1072 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1073
1074 defp maybe_set_thread_muted_field(query, opts) do
1075 query
1076 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1077 end
1078
1079 defp maybe_order(query, %{order: :desc}) do
1080 query
1081 |> order_by(desc: :id)
1082 end
1083
1084 defp maybe_order(query, %{order: :asc}) do
1085 query
1086 |> order_by(asc: :id)
1087 end
1088
1089 defp maybe_order(query, _), do: query
1090
1091 defp fetch_activities_query_ap_ids_ops(opts) do
1092 source_user = opts[:muting_user]
1093 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1094
1095 ap_id_relationships =
1096 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1097 [:block | ap_id_relationships]
1098 else
1099 ap_id_relationships
1100 end
1101
1102 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1103
1104 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1105 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1106
1107 restrict_muted_reblogs_opts =
1108 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1109
1110 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1111 end
1112
1113 def fetch_activities_query(recipients, opts \\ %{}) do
1114 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1115 fetch_activities_query_ap_ids_ops(opts)
1116
1117 config = %{
1118 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1119 }
1120
1121 Activity
1122 |> maybe_preload_objects(opts)
1123 |> maybe_preload_bookmarks(opts)
1124 |> maybe_preload_report_notes(opts)
1125 |> maybe_set_thread_muted_field(opts)
1126 |> maybe_order(opts)
1127 |> restrict_recipients(recipients, opts[:user])
1128 |> restrict_replies(opts)
1129 |> restrict_tag(opts)
1130 |> restrict_tag_reject(opts)
1131 |> restrict_tag_all(opts)
1132 |> restrict_since(opts)
1133 |> restrict_local(opts)
1134 |> restrict_actor(opts)
1135 |> restrict_type(opts)
1136 |> restrict_state(opts)
1137 |> restrict_favorited_by(opts)
1138 |> restrict_blocked(restrict_blocked_opts)
1139 |> restrict_muted(restrict_muted_opts)
1140 |> restrict_media(opts)
1141 |> restrict_visibility(opts)
1142 |> restrict_thread_visibility(opts, config)
1143 |> restrict_reblogs(opts)
1144 |> restrict_pinned(opts)
1145 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1146 |> restrict_instance(opts)
1147 |> Activity.restrict_deactivated_users()
1148 |> exclude_poll_votes(opts)
1149 |> exclude_chat_messages(opts)
1150 |> exclude_invisible_actors(opts)
1151 |> exclude_visibility(opts)
1152 end
1153
1154 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1155 list_memberships = Pleroma.List.memberships(opts[:user])
1156
1157 fetch_activities_query(recipients ++ list_memberships, opts)
1158 |> Pagination.fetch_paginated(opts, pagination)
1159 |> Enum.reverse()
1160 |> maybe_update_cc(list_memberships, opts[:user])
1161 end
1162
1163 @doc """
1164 Fetch favorites activities of user with order by sort adds to favorites
1165 """
1166 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1167 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1168 user.ap_id
1169 |> Activity.Queries.by_actor()
1170 |> Activity.Queries.by_type("Like")
1171 |> Activity.with_joined_object()
1172 |> Object.with_joined_activity()
1173 |> select([_like, object, activity], %{activity | object: object})
1174 |> order_by([like, _, _], desc_nulls_last: like.id)
1175 |> Pagination.fetch_paginated(
1176 Map.merge(params, %{skip_order: true}),
1177 pagination,
1178 :object_activity
1179 )
1180 end
1181
1182 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1183 Enum.map(activities, fn
1184 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1185 if Enum.any?(bcc, &(&1 in list_memberships)) do
1186 update_in(activity.data["cc"], &[user_ap_id | &1])
1187 else
1188 activity
1189 end
1190
1191 activity ->
1192 activity
1193 end)
1194 end
1195
1196 defp maybe_update_cc(activities, _, _), do: activities
1197
1198 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1199 from(activity in query,
1200 where:
1201 fragment("? && ?", activity.recipients, ^recipients) or
1202 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1203 ^Constants.as_public() in activity.recipients)
1204 )
1205 end
1206
1207 def fetch_activities_bounded(
1208 recipients,
1209 recipients_with_public,
1210 opts \\ %{},
1211 pagination \\ :keyset
1212 ) do
1213 fetch_activities_query([], opts)
1214 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1215 |> Pagination.fetch_paginated(opts, pagination)
1216 |> Enum.reverse()
1217 end
1218
1219 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1220 def upload(file, opts \\ []) do
1221 with {:ok, data} <- Upload.store(file, opts) do
1222 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1223
1224 Repo.insert(%Object{data: obj_data})
1225 end
1226 end
1227
1228 @spec get_actor_url(any()) :: binary() | nil
1229 defp get_actor_url(url) when is_binary(url), do: url
1230 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1231
1232 defp get_actor_url(url) when is_list(url) do
1233 url
1234 |> List.first()
1235 |> get_actor_url()
1236 end
1237
1238 defp get_actor_url(_url), do: nil
1239
1240 defp object_to_user_data(data) do
1241 avatar =
1242 data["icon"]["url"] &&
1243 %{
1244 "type" => "Image",
1245 "url" => [%{"href" => data["icon"]["url"]}]
1246 }
1247
1248 banner =
1249 data["image"]["url"] &&
1250 %{
1251 "type" => "Image",
1252 "url" => [%{"href" => data["image"]["url"]}]
1253 }
1254
1255 fields =
1256 data
1257 |> Map.get("attachment", [])
1258 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1259 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1260
1261 emojis =
1262 data
1263 |> Map.get("tag", [])
1264 |> Enum.filter(fn
1265 %{"type" => "Emoji"} -> true
1266 _ -> false
1267 end)
1268 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1269 {String.trim(name, ":"), url}
1270 end)
1271
1272 locked = data["manuallyApprovesFollowers"] || false
1273 data = Transmogrifier.maybe_fix_user_object(data)
1274 discoverable = data["discoverable"] || false
1275 invisible = data["invisible"] || false
1276 actor_type = data["type"] || "Person"
1277
1278 public_key =
1279 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1280 data["publicKey"]["publicKeyPem"]
1281 else
1282 nil
1283 end
1284
1285 shared_inbox =
1286 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1287 data["endpoints"]["sharedInbox"]
1288 else
1289 nil
1290 end
1291
1292 user_data = %{
1293 ap_id: data["id"],
1294 uri: get_actor_url(data["url"]),
1295 ap_enabled: true,
1296 banner: banner,
1297 fields: fields,
1298 emoji: emojis,
1299 locked: locked,
1300 discoverable: discoverable,
1301 invisible: invisible,
1302 avatar: avatar,
1303 name: data["name"],
1304 follower_address: data["followers"],
1305 following_address: data["following"],
1306 bio: data["summary"],
1307 actor_type: actor_type,
1308 also_known_as: Map.get(data, "alsoKnownAs", []),
1309 public_key: public_key,
1310 inbox: data["inbox"],
1311 shared_inbox: shared_inbox
1312 }
1313
1314 # nickname can be nil because of virtual actors
1315 if data["preferredUsername"] do
1316 Map.put(
1317 user_data,
1318 :nickname,
1319 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1320 )
1321 else
1322 Map.put(user_data, :nickname, nil)
1323 end
1324 end
1325
1326 def fetch_follow_information_for_user(user) do
1327 with {:ok, following_data} <-
1328 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1329 {:ok, hide_follows} <- collection_private(following_data),
1330 {:ok, followers_data} <-
1331 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1332 {:ok, hide_followers} <- collection_private(followers_data) do
1333 {:ok,
1334 %{
1335 hide_follows: hide_follows,
1336 follower_count: normalize_counter(followers_data["totalItems"]),
1337 following_count: normalize_counter(following_data["totalItems"]),
1338 hide_followers: hide_followers
1339 }}
1340 else
1341 {:error, _} = e -> e
1342 e -> {:error, e}
1343 end
1344 end
1345
1346 defp normalize_counter(counter) when is_integer(counter), do: counter
1347 defp normalize_counter(_), do: 0
1348
1349 def maybe_update_follow_information(user_data) do
1350 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1351 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1352 {_, true} <-
1353 {:collections_available,
1354 !!(user_data[:following_address] && user_data[:follower_address])},
1355 {:ok, info} <-
1356 fetch_follow_information_for_user(user_data) do
1357 info = Map.merge(user_data[:info] || %{}, info)
1358
1359 user_data
1360 |> Map.put(:info, info)
1361 else
1362 {:user_type_check, false} ->
1363 user_data
1364
1365 {:collections_available, false} ->
1366 user_data
1367
1368 {:enabled, false} ->
1369 user_data
1370
1371 e ->
1372 Logger.error(
1373 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1374 )
1375
1376 user_data
1377 end
1378 end
1379
1380 defp collection_private(%{"first" => %{"type" => type}})
1381 when type in ["CollectionPage", "OrderedCollectionPage"],
1382 do: {:ok, false}
1383
1384 defp collection_private(%{"first" => first}) do
1385 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1386 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1387 {:ok, false}
1388 else
1389 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1390 {:error, _} = e -> e
1391 e -> {:error, e}
1392 end
1393 end
1394
1395 defp collection_private(_data), do: {:ok, true}
1396
1397 def user_data_from_user_object(data) do
1398 with {:ok, data} <- MRF.filter(data) do
1399 {:ok, object_to_user_data(data)}
1400 else
1401 e -> {:error, e}
1402 end
1403 end
1404
1405 def fetch_and_prepare_user_from_ap_id(ap_id) do
1406 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1407 {:ok, data} <- user_data_from_user_object(data) do
1408 {:ok, maybe_update_follow_information(data)}
1409 else
1410 {:error, "Object has been deleted" = e} ->
1411 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1412 {:error, e}
1413
1414 {:error, e} ->
1415 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1416 {:error, e}
1417 end
1418 end
1419
1420 def make_user_from_ap_id(ap_id) do
1421 user = User.get_cached_by_ap_id(ap_id)
1422
1423 if user && !User.ap_enabled?(user) do
1424 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1425 else
1426 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1427 if user do
1428 user
1429 |> User.remote_user_changeset(data)
1430 |> User.update_and_set_cache()
1431 else
1432 data
1433 |> User.remote_user_changeset()
1434 |> Repo.insert()
1435 |> User.set_cache()
1436 end
1437 end
1438 end
1439 end
1440
1441 def make_user_from_nickname(nickname) do
1442 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1443 make_user_from_ap_id(ap_id)
1444 else
1445 _e -> {:error, "No AP id in WebFinger"}
1446 end
1447 end
1448
1449 # filter out broken threads
1450 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1451 entire_thread_visible_for_user?(activity, user)
1452 end
1453
1454 # do post-processing on a specific activity
1455 def contain_activity(%Activity{} = activity, %User{} = user) do
1456 contain_broken_threads(activity, user)
1457 end
1458
1459 def fetch_direct_messages_query do
1460 Activity
1461 |> restrict_type(%{type: "Create"})
1462 |> restrict_visibility(%{visibility: "direct"})
1463 |> order_by([activity], asc: activity.id)
1464 end
1465 end