1c91bc07482b22ac3905512860537f5a009486d3
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
91 {:ok, object, meta}
92 end
93 end
94
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
98 {:ok, activity} <-
99 Repo.insert(%Activity{
100 data: object,
101 local: local,
102 recipients: recipients,
103 actor: object["actor"]
104 }),
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
108 end
109 end
110
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
125
126 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
127 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
128 end)
129
130 {:ok, activity}
131 else
132 %Activity{} = activity ->
133 {:ok, activity}
134
135 {:actor_check, _} ->
136 {:error, false}
137
138 {:containment, _} = error ->
139 error
140
141 {:error, _} = error ->
142 error
143
144 {:fake, true, map, recipients} ->
145 activity = %Activity{
146 data: map,
147 local: local,
148 actor: map["actor"],
149 recipients: recipients,
150 id: "pleroma:fakeid"
151 }
152
153 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
154 {:ok, activity}
155
156 {:remote_limit_pass, _} ->
157 {:error, :remote_limit}
158
159 {:reject, _} = e ->
160 {:error, e}
161 end
162 end
163
164 defp insert_activity_with_expiration(data, local, recipients) do
165 struct = %Activity{
166 data: data,
167 local: local,
168 actor: data["actor"],
169 recipients: recipients
170 }
171
172 with {:ok, activity} <- Repo.insert(struct) do
173 maybe_create_activity_expiration(activity)
174 end
175 end
176
177 def notify_and_stream(activity) do
178 Notification.create_notifications(activity)
179
180 conversation = create_or_bump_conversation(activity, activity.actor)
181 participations = get_participations(conversation)
182 stream_out(activity)
183 stream_out_participations(participations)
184 end
185
186 defp maybe_create_activity_expiration(
187 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
188 ) do
189 with {:ok, _job} <-
190 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
191 activity_id: activity.id,
192 expires_at: expires_at
193 }) do
194 {:ok, activity}
195 end
196 end
197
198 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
199
200 defp create_or_bump_conversation(activity, actor) do
201 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
202 %User{} = user <- User.get_cached_by_ap_id(actor) do
203 Participation.mark_as_read(user, conversation)
204 {:ok, conversation}
205 end
206 end
207
208 defp get_participations({:ok, conversation}) do
209 conversation
210 |> Repo.preload(:participations, force: true)
211 |> Map.get(:participations)
212 end
213
214 defp get_participations(_), do: []
215
216 def stream_out_participations(participations) do
217 participations =
218 participations
219 |> Repo.preload(:user)
220
221 Streamer.stream("participation", participations)
222 end
223
224 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
225 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
226 conversation = Repo.preload(conversation, :participations)
227
228 last_activity_id =
229 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
230 user: user,
231 blocking_user: user
232 })
233
234 if last_activity_id do
235 stream_out_participations(conversation.participations)
236 end
237 end
238 end
239
240 def stream_out_participations(_, _), do: :noop
241
242 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
243 when data_type in ["Create", "Announce", "Delete"] do
244 activity
245 |> Topics.get_activity_topics()
246 |> Streamer.stream(activity)
247 end
248
249 def stream_out(_activity) do
250 :noop
251 end
252
253 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
254 def create(params, fake \\ false) do
255 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
256 result
257 end
258 end
259
260 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
261 additional = params[:additional] || %{}
262 # only accept false as false value
263 local = !(params[:local] == false)
264 published = params[:published]
265 quick_insert? = Config.get([:env]) == :benchmark
266
267 create_data =
268 make_create_data(
269 %{to: to, actor: actor, published: published, context: context, object: object},
270 additional
271 )
272
273 with {:ok, activity} <- insert(create_data, local, fake),
274 {:fake, false, activity} <- {:fake, fake, activity},
275 _ <- increase_replies_count_if_reply(create_data),
276 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
277 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
278 _ <- notify_and_stream(activity),
279 :ok <- maybe_federate(activity) do
280 {:ok, activity}
281 else
282 {:quick_insert, true, activity} ->
283 {:ok, activity}
284
285 {:fake, true, activity} ->
286 {:ok, activity}
287
288 {:error, message} ->
289 Repo.rollback(message)
290 end
291 end
292
293 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
294 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
295 additional = params[:additional] || %{}
296 # only accept false as false value
297 local = !(params[:local] == false)
298 published = params[:published]
299
300 listen_data =
301 make_listen_data(
302 %{to: to, actor: actor, published: published, context: context, object: object},
303 additional
304 )
305
306 with {:ok, activity} <- insert(listen_data, local),
307 _ <- notify_and_stream(activity),
308 :ok <- maybe_federate(activity) do
309 {:ok, activity}
310 end
311 end
312
313 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
314 {:ok, Activity.t()} | nil | {:error, any()}
315 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
316 with {:ok, result} <-
317 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
318 result
319 end
320 end
321
322 defp do_unfollow(follower, followed, activity_id, local) do
323 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
324 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
325 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
326 {:ok, activity} <- insert(unfollow_data, local),
327 _ <- notify_and_stream(activity),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity}
330 else
331 nil -> nil
332 {:error, error} -> Repo.rollback(error)
333 end
334 end
335
336 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
337 def flag(params) do
338 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
339 result
340 end
341 end
342
343 defp do_flag(
344 %{
345 actor: actor,
346 context: _context,
347 account: account,
348 statuses: statuses,
349 content: content
350 } = params
351 ) do
352 # only accept false as false value
353 local = !(params[:local] == false)
354 forward = !(params[:forward] == false)
355
356 additional = params[:additional] || %{}
357
358 additional =
359 if forward do
360 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
361 else
362 Map.merge(additional, %{"to" => [], "cc" => []})
363 end
364
365 with flag_data <- make_flag_data(params, additional),
366 {:ok, activity} <- insert(flag_data, local),
367 {:ok, stripped_activity} <- strip_report_status_data(activity),
368 _ <- notify_and_stream(activity),
369 :ok <-
370 maybe_federate(stripped_activity) do
371 User.all_superusers()
372 |> Enum.filter(fn user -> not is_nil(user.email) end)
373 |> Enum.each(fn superuser ->
374 superuser
375 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
376 |> Pleroma.Emails.Mailer.deliver_async()
377 end)
378
379 {:ok, activity}
380 else
381 {:error, error} -> Repo.rollback(error)
382 end
383 end
384
385 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
386 def move(%User{} = origin, %User{} = target, local \\ true) do
387 params = %{
388 "type" => "Move",
389 "actor" => origin.ap_id,
390 "object" => origin.ap_id,
391 "target" => target.ap_id
392 }
393
394 with true <- origin.ap_id in target.also_known_as,
395 {:ok, activity} <- insert(params, local),
396 _ <- notify_and_stream(activity) do
397 maybe_federate(activity)
398
399 BackgroundWorker.enqueue("move_following", %{
400 "origin_id" => origin.id,
401 "target_id" => target.id
402 })
403
404 {:ok, activity}
405 else
406 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
407 err -> err
408 end
409 end
410
411 def fetch_activities_for_context_query(context, opts) do
412 public = [Constants.as_public()]
413
414 recipients =
415 if opts[:user],
416 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
417 else: public
418
419 from(activity in Activity)
420 |> maybe_preload_objects(opts)
421 |> maybe_preload_bookmarks(opts)
422 |> maybe_set_thread_muted_field(opts)
423 |> restrict_blocked(opts)
424 |> restrict_recipients(recipients, opts[:user])
425 |> restrict_filtered(opts)
426 |> where(
427 [activity],
428 fragment(
429 "?->>'type' = ? and ?->>'context' = ?",
430 activity.data,
431 "Create",
432 activity.data,
433 ^context
434 )
435 )
436 |> exclude_poll_votes(opts)
437 |> exclude_id(opts)
438 |> order_by([activity], desc: activity.id)
439 end
440
441 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
442 def fetch_activities_for_context(context, opts \\ %{}) do
443 context
444 |> fetch_activities_for_context_query(opts)
445 |> Repo.all()
446 end
447
448 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
449 FlakeId.Ecto.CompatType.t() | nil
450 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
453 |> restrict_visibility(%{visibility: "direct"})
454 |> limit(1)
455 |> select([a], a.id)
456 |> Repo.one()
457 end
458
459 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
460 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
461 opts = Map.delete(opts, :user)
462
463 [Constants.as_public()]
464 |> fetch_activities_query(opts)
465 |> restrict_unlisted(opts)
466 |> Pagination.fetch_paginated(opts, pagination)
467 end
468
469 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
470 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
471 opts
472 |> Map.put(:restrict_unlisted, true)
473 |> fetch_public_or_unlisted_activities(pagination)
474 end
475
476 @valid_visibilities ~w[direct unlisted public private]
477
478 defp restrict_visibility(query, %{visibility: visibility})
479 when is_list(visibility) do
480 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
481 from(
482 a in query,
483 where:
484 fragment(
485 "activity_visibility(?, ?, ?) = ANY (?)",
486 a.actor,
487 a.recipients,
488 a.data,
489 ^visibility
490 )
491 )
492 else
493 Logger.error("Could not restrict visibility to #{visibility}")
494 end
495 end
496
497 defp restrict_visibility(query, %{visibility: visibility})
498 when visibility in @valid_visibilities do
499 from(
500 a in query,
501 where:
502 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
503 )
504 end
505
506 defp restrict_visibility(_query, %{visibility: visibility})
507 when visibility not in @valid_visibilities do
508 Logger.error("Could not restrict visibility to #{visibility}")
509 end
510
511 defp restrict_visibility(query, _visibility), do: query
512
513 defp exclude_visibility(query, %{exclude_visibilities: visibility})
514 when is_list(visibility) do
515 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
516 from(
517 a in query,
518 where:
519 not fragment(
520 "activity_visibility(?, ?, ?) = ANY (?)",
521 a.actor,
522 a.recipients,
523 a.data,
524 ^visibility
525 )
526 )
527 else
528 Logger.error("Could not exclude visibility to #{visibility}")
529 query
530 end
531 end
532
533 defp exclude_visibility(query, %{exclude_visibilities: visibility})
534 when visibility in @valid_visibilities do
535 from(
536 a in query,
537 where:
538 not fragment(
539 "activity_visibility(?, ?, ?) = ?",
540 a.actor,
541 a.recipients,
542 a.data,
543 ^visibility
544 )
545 )
546 end
547
548 defp exclude_visibility(query, %{exclude_visibilities: visibility})
549 when visibility not in [nil | @valid_visibilities] do
550 Logger.error("Could not exclude visibility to #{visibility}")
551 query
552 end
553
554 defp exclude_visibility(query, _visibility), do: query
555
556 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
557 do: query
558
559 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
563 from(
564 a in query,
565 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
566 )
567 end
568
569 defp restrict_thread_visibility(query, _, _), do: query
570
571 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
572 params =
573 params
574 |> Map.put(:user, reading_user)
575 |> Map.put(:actor_id, user.ap_id)
576
577 %{
578 godmode: params[:godmode],
579 reading_user: reading_user
580 }
581 |> user_activities_recipients()
582 |> fetch_activities(params)
583 |> Enum.reverse()
584 end
585
586 def fetch_user_activities(user, reading_user, params \\ %{}) do
587 params =
588 params
589 |> Map.put(:type, ["Create", "Announce"])
590 |> Map.put(:user, reading_user)
591 |> Map.put(:actor_id, user.ap_id)
592 |> Map.put(:pinned_activity_ids, user.pinned_activities)
593
594 params =
595 if User.blocks?(reading_user, user) do
596 params
597 else
598 params
599 |> Map.put(:blocking_user, reading_user)
600 |> Map.put(:muting_user, reading_user)
601 end
602
603 %{
604 godmode: params[:godmode],
605 reading_user: reading_user
606 }
607 |> user_activities_recipients()
608 |> fetch_activities(params)
609 |> Enum.reverse()
610 end
611
612 def fetch_statuses(reading_user, params) do
613 params = Map.put(params, :type, ["Create", "Announce"])
614
615 %{
616 godmode: params[:godmode],
617 reading_user: reading_user
618 }
619 |> user_activities_recipients()
620 |> fetch_activities(params, :offset)
621 |> Enum.reverse()
622 end
623
624 defp user_activities_recipients(%{godmode: true}), do: []
625
626 defp user_activities_recipients(%{reading_user: reading_user}) do
627 if reading_user do
628 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
629 else
630 [Constants.as_public()]
631 end
632 end
633
634 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
635 raise "Can't use the child object without preloading!"
636 end
637
638 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
639 from(
640 [activity, object] in query,
641 where:
642 fragment(
643 "?->>'type' != ? or ?->>'actor' != ?",
644 activity.data,
645 "Announce",
646 object.data,
647 ^actor
648 )
649 )
650 end
651
652 defp restrict_announce_object_actor(query, _), do: query
653
654 defp restrict_since(query, %{since_id: ""}), do: query
655
656 defp restrict_since(query, %{since_id: since_id}) do
657 from(activity in query, where: activity.id > ^since_id)
658 end
659
660 defp restrict_since(query, _), do: query
661
662 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
663 raise "Can't use the child object without preloading!"
664 end
665
666 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
667 from(
668 [_activity, object] in query,
669 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
670 )
671 end
672
673 defp restrict_tag_reject(query, _), do: query
674
675 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
676 raise "Can't use the child object without preloading!"
677 end
678
679 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
680 from(
681 [_activity, object] in query,
682 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
683 )
684 end
685
686 defp restrict_tag_all(query, _), do: query
687
688 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
689 raise "Can't use the child object without preloading!"
690 end
691
692 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
693 from(
694 [_activity, object] in query,
695 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
696 )
697 end
698
699 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
700 from(
701 [_activity, object] in query,
702 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
703 )
704 end
705
706 defp restrict_tag(query, _), do: query
707
708 defp restrict_recipients(query, [], _user), do: query
709
710 defp restrict_recipients(query, recipients, nil) do
711 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
712 end
713
714 defp restrict_recipients(query, recipients, user) do
715 from(
716 activity in query,
717 where: fragment("? && ?", ^recipients, activity.recipients),
718 or_where: activity.actor == ^user.ap_id
719 )
720 end
721
722 defp restrict_local(query, %{local_only: true}) do
723 from(activity in query, where: activity.local == true)
724 end
725
726 defp restrict_local(query, _), do: query
727
728 defp restrict_actor(query, %{actor_id: actor_id}) do
729 from(activity in query, where: activity.actor == ^actor_id)
730 end
731
732 defp restrict_actor(query, _), do: query
733
734 defp restrict_type(query, %{type: type}) when is_binary(type) do
735 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
736 end
737
738 defp restrict_type(query, %{type: type}) do
739 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
740 end
741
742 defp restrict_type(query, _), do: query
743
744 defp restrict_state(query, %{state: state}) do
745 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
746 end
747
748 defp restrict_state(query, _), do: query
749
750 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
751 from(
752 [_activity, object] in query,
753 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
754 )
755 end
756
757 defp restrict_favorited_by(query, _), do: query
758
759 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
760 raise "Can't use the child object without preloading!"
761 end
762
763 defp restrict_media(query, %{only_media: true}) do
764 from(
765 [activity, object] in query,
766 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
767 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
768 )
769 end
770
771 defp restrict_media(query, _), do: query
772
773 defp restrict_replies(query, %{exclude_replies: true}) do
774 from(
775 [_activity, object] in query,
776 where: fragment("?->>'inReplyTo' is null", object.data)
777 )
778 end
779
780 defp restrict_replies(query, %{
781 reply_filtering_user: %User{} = user,
782 reply_visibility: "self"
783 }) do
784 from(
785 [activity, object] in query,
786 where:
787 fragment(
788 "?->>'inReplyTo' is null OR ? = ANY(?)",
789 object.data,
790 ^user.ap_id,
791 activity.recipients
792 )
793 )
794 end
795
796 defp restrict_replies(query, %{
797 reply_filtering_user: %User{} = user,
798 reply_visibility: "following"
799 }) do
800 from(
801 [activity, object] in query,
802 where:
803 fragment(
804 """
805 ?->>'type' != 'Create' -- This isn't a Create
806 OR ?->>'inReplyTo' is null -- this isn't a reply
807 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
808 -- unless they are the author (because authors
809 -- are also part of the recipients). This leads
810 -- to a bug that self-replies by friends won't
811 -- show up.
812 OR ? = ? -- The actor is us
813 """,
814 activity.data,
815 object.data,
816 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
817 activity.recipients,
818 activity.actor,
819 activity.actor,
820 ^user.ap_id
821 )
822 )
823 end
824
825 defp restrict_replies(query, _), do: query
826
827 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
828 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
829 end
830
831 defp restrict_reblogs(query, _), do: query
832
833 defp restrict_muted(query, %{with_muted: true}), do: query
834
835 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
836 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
837
838 query =
839 from([activity] in query,
840 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
841 where:
842 fragment(
843 "not (?->'to' \\?| ?) or ? = ?",
844 activity.data,
845 ^mutes,
846 activity.actor,
847 ^user.ap_id
848 )
849 )
850
851 unless opts[:skip_preload] do
852 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
853 else
854 query
855 end
856 end
857
858 defp restrict_muted(query, _), do: query
859
860 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
861 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
862 domain_blocks = user.domain_blocks || []
863
864 following_ap_ids = User.get_friends_ap_ids(user)
865
866 query =
867 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
868
869 from(
870 [activity, object: o] in query,
871 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
872 where:
873 fragment(
874 "((not (? && ?)) or ? = ?)",
875 activity.recipients,
876 ^blocked_ap_ids,
877 activity.actor,
878 ^user.ap_id
879 ),
880 where:
881 fragment(
882 "recipients_contain_blocked_domains(?, ?) = false",
883 activity.recipients,
884 ^domain_blocks
885 ),
886 where:
887 fragment(
888 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
889 activity.data,
890 activity.data,
891 ^blocked_ap_ids
892 ),
893 where:
894 fragment(
895 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
896 activity.actor,
897 ^domain_blocks,
898 activity.actor,
899 ^following_ap_ids
900 ),
901 where:
902 fragment(
903 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
904 o.data,
905 ^domain_blocks,
906 o.data,
907 ^following_ap_ids
908 )
909 )
910 end
911
912 defp restrict_blocked(query, _), do: query
913
914 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
915 from(
916 activity in query,
917 where:
918 fragment(
919 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
920 activity.data,
921 ^[Constants.as_public()]
922 )
923 )
924 end
925
926 defp restrict_unlisted(query, _), do: query
927
928 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
929 from(activity in query, where: activity.id in ^ids)
930 end
931
932 defp restrict_pinned(query, _), do: query
933
934 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
935 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
936
937 from(
938 activity in query,
939 where:
940 fragment(
941 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
942 activity.data,
943 activity.actor,
944 ^muted_reblogs
945 )
946 )
947 end
948
949 defp restrict_muted_reblogs(query, _), do: query
950
951 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
952 from(
953 activity in query,
954 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
955 )
956 end
957
958 defp restrict_instance(query, _), do: query
959
960 defp restrict_filtered(query, %{user: %User{} = user}) do
961 case Filter.compose_regex(user) do
962 nil ->
963 query
964
965 regex ->
966 from([activity, object] in query,
967 where:
968 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
969 activity.actor == ^user.ap_id
970 )
971 end
972 end
973
974 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
975 restrict_filtered(query, %{user: user})
976 end
977
978 defp restrict_filtered(query, _), do: query
979
980 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
981
982 defp exclude_poll_votes(query, _) do
983 if has_named_binding?(query, :object) do
984 from([activity, object: o] in query,
985 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
986 )
987 else
988 query
989 end
990 end
991
992 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
993
994 defp exclude_chat_messages(query, _) do
995 if has_named_binding?(query, :object) do
996 from([activity, object: o] in query,
997 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
998 )
999 else
1000 query
1001 end
1002 end
1003
1004 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1005
1006 defp exclude_invisible_actors(query, _opts) do
1007 invisible_ap_ids =
1008 User.Query.build(%{invisible: true, select: [:ap_id]})
1009 |> Repo.all()
1010 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1011
1012 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1013 end
1014
1015 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1016 from(activity in query, where: activity.id != ^id)
1017 end
1018
1019 defp exclude_id(query, _), do: query
1020
1021 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1022
1023 defp maybe_preload_objects(query, _) do
1024 query
1025 |> Activity.with_preloaded_object()
1026 end
1027
1028 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1029
1030 defp maybe_preload_bookmarks(query, opts) do
1031 query
1032 |> Activity.with_preloaded_bookmark(opts[:user])
1033 end
1034
1035 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1036 query
1037 |> Activity.with_preloaded_report_notes()
1038 end
1039
1040 defp maybe_preload_report_notes(query, _), do: query
1041
1042 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1043
1044 defp maybe_set_thread_muted_field(query, opts) do
1045 query
1046 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1047 end
1048
1049 defp maybe_order(query, %{order: :desc}) do
1050 query
1051 |> order_by(desc: :id)
1052 end
1053
1054 defp maybe_order(query, %{order: :asc}) do
1055 query
1056 |> order_by(asc: :id)
1057 end
1058
1059 defp maybe_order(query, _), do: query
1060
1061 defp fetch_activities_query_ap_ids_ops(opts) do
1062 source_user = opts[:muting_user]
1063 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1064
1065 ap_id_relationships =
1066 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1067 [:block | ap_id_relationships]
1068 else
1069 ap_id_relationships
1070 end
1071
1072 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1073
1074 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1075 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1076
1077 restrict_muted_reblogs_opts =
1078 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1079
1080 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1081 end
1082
1083 def fetch_activities_query(recipients, opts \\ %{}) do
1084 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1085 fetch_activities_query_ap_ids_ops(opts)
1086
1087 config = %{
1088 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1089 }
1090
1091 Activity
1092 |> maybe_preload_objects(opts)
1093 |> maybe_preload_bookmarks(opts)
1094 |> maybe_preload_report_notes(opts)
1095 |> maybe_set_thread_muted_field(opts)
1096 |> maybe_order(opts)
1097 |> restrict_recipients(recipients, opts[:user])
1098 |> restrict_replies(opts)
1099 |> restrict_tag(opts)
1100 |> restrict_tag_reject(opts)
1101 |> restrict_tag_all(opts)
1102 |> restrict_since(opts)
1103 |> restrict_local(opts)
1104 |> restrict_actor(opts)
1105 |> restrict_type(opts)
1106 |> restrict_state(opts)
1107 |> restrict_favorited_by(opts)
1108 |> restrict_blocked(restrict_blocked_opts)
1109 |> restrict_muted(restrict_muted_opts)
1110 |> restrict_filtered(opts)
1111 |> restrict_media(opts)
1112 |> restrict_visibility(opts)
1113 |> restrict_thread_visibility(opts, config)
1114 |> restrict_reblogs(opts)
1115 |> restrict_pinned(opts)
1116 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1117 |> restrict_instance(opts)
1118 |> restrict_announce_object_actor(opts)
1119 |> restrict_filtered(opts)
1120 |> Activity.restrict_deactivated_users()
1121 |> exclude_poll_votes(opts)
1122 |> exclude_chat_messages(opts)
1123 |> exclude_invisible_actors(opts)
1124 |> exclude_visibility(opts)
1125 end
1126
1127 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1128 list_memberships = Pleroma.List.memberships(opts[:user])
1129
1130 fetch_activities_query(recipients ++ list_memberships, opts)
1131 |> Pagination.fetch_paginated(opts, pagination)
1132 |> Enum.reverse()
1133 |> maybe_update_cc(list_memberships, opts[:user])
1134 end
1135
1136 @doc """
1137 Fetch favorites activities of user with order by sort adds to favorites
1138 """
1139 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1140 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1141 user.ap_id
1142 |> Activity.Queries.by_actor()
1143 |> Activity.Queries.by_type("Like")
1144 |> Activity.with_joined_object()
1145 |> Object.with_joined_activity()
1146 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1147 |> order_by([like, _, _], desc_nulls_last: like.id)
1148 |> Pagination.fetch_paginated(
1149 Map.merge(params, %{skip_order: true}),
1150 pagination
1151 )
1152 end
1153
1154 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1155 Enum.map(activities, fn
1156 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1157 if Enum.any?(bcc, &(&1 in list_memberships)) do
1158 update_in(activity.data["cc"], &[user_ap_id | &1])
1159 else
1160 activity
1161 end
1162
1163 activity ->
1164 activity
1165 end)
1166 end
1167
1168 defp maybe_update_cc(activities, _, _), do: activities
1169
1170 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1171 from(activity in query,
1172 where:
1173 fragment("? && ?", activity.recipients, ^recipients) or
1174 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1175 ^Constants.as_public() in activity.recipients)
1176 )
1177 end
1178
1179 def fetch_activities_bounded(
1180 recipients,
1181 recipients_with_public,
1182 opts \\ %{},
1183 pagination \\ :keyset
1184 ) do
1185 fetch_activities_query([], opts)
1186 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1187 |> Pagination.fetch_paginated(opts, pagination)
1188 |> Enum.reverse()
1189 end
1190
1191 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1192 def upload(file, opts \\ []) do
1193 with {:ok, data} <- Upload.store(file, opts) do
1194 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1195
1196 Repo.insert(%Object{data: obj_data})
1197 end
1198 end
1199
1200 @spec get_actor_url(any()) :: binary() | nil
1201 defp get_actor_url(url) when is_binary(url), do: url
1202 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1203
1204 defp get_actor_url(url) when is_list(url) do
1205 url
1206 |> List.first()
1207 |> get_actor_url()
1208 end
1209
1210 defp get_actor_url(_url), do: nil
1211
1212 defp object_to_user_data(data) do
1213 avatar =
1214 data["icon"]["url"] &&
1215 %{
1216 "type" => "Image",
1217 "url" => [%{"href" => data["icon"]["url"]}]
1218 }
1219
1220 banner =
1221 data["image"]["url"] &&
1222 %{
1223 "type" => "Image",
1224 "url" => [%{"href" => data["image"]["url"]}]
1225 }
1226
1227 fields =
1228 data
1229 |> Map.get("attachment", [])
1230 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1231 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1232
1233 emojis =
1234 data
1235 |> Map.get("tag", [])
1236 |> Enum.filter(fn
1237 %{"type" => "Emoji"} -> true
1238 _ -> false
1239 end)
1240 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1241 {String.trim(name, ":"), url}
1242 end)
1243
1244 is_locked = data["manuallyApprovesFollowers"] || false
1245 capabilities = data["capabilities"] || %{}
1246 accepts_chat_messages = capabilities["acceptsChatMessages"]
1247 data = Transmogrifier.maybe_fix_user_object(data)
1248 is_discoverable = data["discoverable"] || false
1249 invisible = data["invisible"] || false
1250 actor_type = data["type"] || "Person"
1251
1252 public_key =
1253 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1254 data["publicKey"]["publicKeyPem"]
1255 else
1256 nil
1257 end
1258
1259 shared_inbox =
1260 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1261 data["endpoints"]["sharedInbox"]
1262 else
1263 nil
1264 end
1265
1266 user_data = %{
1267 ap_id: data["id"],
1268 uri: get_actor_url(data["url"]),
1269 ap_enabled: true,
1270 banner: banner,
1271 fields: fields,
1272 emoji: emojis,
1273 is_locked: is_locked,
1274 is_discoverable: is_discoverable,
1275 invisible: invisible,
1276 avatar: avatar,
1277 name: data["name"],
1278 follower_address: data["followers"],
1279 following_address: data["following"],
1280 bio: data["summary"] || "",
1281 actor_type: actor_type,
1282 also_known_as: Map.get(data, "alsoKnownAs", []),
1283 public_key: public_key,
1284 inbox: data["inbox"],
1285 shared_inbox: shared_inbox,
1286 accepts_chat_messages: accepts_chat_messages
1287 }
1288
1289 # nickname can be nil because of virtual actors
1290 if data["preferredUsername"] do
1291 Map.put(
1292 user_data,
1293 :nickname,
1294 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1295 )
1296 else
1297 Map.put(user_data, :nickname, nil)
1298 end
1299 end
1300
1301 def fetch_follow_information_for_user(user) do
1302 with {:ok, following_data} <-
1303 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1304 {:ok, hide_follows} <- collection_private(following_data),
1305 {:ok, followers_data} <-
1306 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1307 {:ok, hide_followers} <- collection_private(followers_data) do
1308 {:ok,
1309 %{
1310 hide_follows: hide_follows,
1311 follower_count: normalize_counter(followers_data["totalItems"]),
1312 following_count: normalize_counter(following_data["totalItems"]),
1313 hide_followers: hide_followers
1314 }}
1315 else
1316 {:error, _} = e -> e
1317 e -> {:error, e}
1318 end
1319 end
1320
1321 defp normalize_counter(counter) when is_integer(counter), do: counter
1322 defp normalize_counter(_), do: 0
1323
1324 def maybe_update_follow_information(user_data) do
1325 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1326 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1327 {_, true} <-
1328 {:collections_available,
1329 !!(user_data[:following_address] && user_data[:follower_address])},
1330 {:ok, info} <-
1331 fetch_follow_information_for_user(user_data) do
1332 info = Map.merge(user_data[:info] || %{}, info)
1333
1334 user_data
1335 |> Map.put(:info, info)
1336 else
1337 {:user_type_check, false} ->
1338 user_data
1339
1340 {:collections_available, false} ->
1341 user_data
1342
1343 {:enabled, false} ->
1344 user_data
1345
1346 e ->
1347 Logger.error(
1348 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1349 )
1350
1351 user_data
1352 end
1353 end
1354
1355 defp collection_private(%{"first" => %{"type" => type}})
1356 when type in ["CollectionPage", "OrderedCollectionPage"],
1357 do: {:ok, false}
1358
1359 defp collection_private(%{"first" => first}) do
1360 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1361 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1362 {:ok, false}
1363 else
1364 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1365 {:error, _} = e -> e
1366 e -> {:error, e}
1367 end
1368 end
1369
1370 defp collection_private(_data), do: {:ok, true}
1371
1372 def user_data_from_user_object(data) do
1373 with {:ok, data} <- MRF.filter(data) do
1374 {:ok, object_to_user_data(data)}
1375 else
1376 e -> {:error, e}
1377 end
1378 end
1379
1380 def fetch_and_prepare_user_from_ap_id(ap_id) do
1381 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1382 {:ok, data} <- user_data_from_user_object(data) do
1383 {:ok, maybe_update_follow_information(data)}
1384 else
1385 # If this has been deleted, only log a debug and not an error
1386 {:error, "Object has been deleted" = e} ->
1387 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1388 {:error, e}
1389
1390 {:error, {:reject, reason} = e} ->
1391 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1392 {:error, e}
1393
1394 {:error, e} ->
1395 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1396 {:error, e}
1397 end
1398 end
1399
1400 def maybe_handle_clashing_nickname(data) do
1401 with nickname when is_binary(nickname) <- data[:nickname],
1402 %User{} = old_user <- User.get_by_nickname(nickname),
1403 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1404 Logger.info(
1405 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1406 data[:ap_id]
1407 }, renaming."
1408 )
1409
1410 old_user
1411 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1412 |> User.update_and_set_cache()
1413 else
1414 {:ap_id_comparison, true} ->
1415 Logger.info(
1416 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1417 )
1418
1419 _ ->
1420 nil
1421 end
1422 end
1423
1424 def make_user_from_ap_id(ap_id) do
1425 user = User.get_cached_by_ap_id(ap_id)
1426
1427 if user && !User.ap_enabled?(user) do
1428 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1429 else
1430 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1431 if user do
1432 user
1433 |> User.remote_user_changeset(data)
1434 |> User.update_and_set_cache()
1435 else
1436 maybe_handle_clashing_nickname(data)
1437
1438 data
1439 |> User.remote_user_changeset()
1440 |> Repo.insert()
1441 |> User.set_cache()
1442 end
1443 end
1444 end
1445 end
1446
1447 def make_user_from_nickname(nickname) do
1448 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1449 make_user_from_ap_id(ap_id)
1450 else
1451 _e -> {:error, "No AP id in WebFinger"}
1452 end
1453 end
1454
1455 # filter out broken threads
1456 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1457 entire_thread_visible_for_user?(activity, user)
1458 end
1459
1460 # do post-processing on a specific activity
1461 def contain_activity(%Activity{} = activity, %User{} = user) do
1462 contain_broken_threads(activity, user)
1463 end
1464
1465 def fetch_direct_messages_query do
1466 Activity
1467 |> restrict_type(%{type: "Create"})
1468 |> restrict_visibility(%{visibility: "direct"})
1469 |> order_by([activity], asc: activity.id)
1470 end
1471 end