Merge branch 'cachex-test' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
43 {recipients, to, cc}
44 end
45
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
51 {recipients, to, cc}
52 end
53
54 defp check_actor_is_active(nil), do: true
55
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
59 _ -> false
60 end
61 end
62
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
66 end
67
68 defp check_remote_limit(_), do: true
69
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 end
73
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 end
77
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 "type" => "Create"
81 }) do
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
84 end
85 end
86
87 defp increase_replies_count_if_reply(_create_data), do: :noop
88
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
90 @impl true
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
93 {:ok, object, meta}
94 end
95 end
96
97 @impl true
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
101 {:ok, activity} <-
102 Repo.insert(%Activity{
103 data: object,
104 local: local,
105 recipients: recipients,
106 actor: object["actor"]
107 }),
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
111 end
112 end
113
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
128
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
131 end)
132
133 {:ok, activity}
134 else
135 %Activity{} = activity ->
136 {:ok, activity}
137
138 {:actor_check, _} ->
139 {:error, false}
140
141 {:containment, _} = error ->
142 error
143
144 {:error, _} = error ->
145 error
146
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
149 data: map,
150 local: local,
151 actor: map["actor"],
152 recipients: recipients,
153 id: "pleroma:fakeid"
154 }
155
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
157 {:ok, activity}
158
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
161
162 {:reject, _} = e ->
163 {:error, e}
164 end
165 end
166
167 defp insert_activity_with_expiration(data, local, recipients) do
168 struct = %Activity{
169 data: data,
170 local: local,
171 actor: data["actor"],
172 recipients: recipients
173 }
174
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
177 end
178 end
179
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
182
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
185 stream_out(activity)
186 stream_out_participations(participations)
187 end
188
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
191 ) do
192 with {:ok, _job} <-
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
196 }) do
197 {:ok, activity}
198 end
199 end
200
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
202
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
207 {:ok, conversation}
208 end
209 end
210
211 defp get_participations({:ok, conversation}) do
212 conversation
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
215 end
216
217 defp get_participations(_), do: []
218
219 def stream_out_participations(participations) do
220 participations =
221 participations
222 |> Repo.preload(:user)
223
224 Streamer.stream("participation", participations)
225 end
226
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
230
231 last_activity_id =
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
233 user: user,
234 blocking_user: user
235 })
236
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
239 end
240 end
241 end
242
243 def stream_out_participations(_, _), do: :noop
244
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
247 activity
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
250 end
251
252 def stream_out(_activity) do
253 :noop
254 end
255
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
259 result
260 end
261 end
262
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
269
270 create_data =
271 make_create_data(
272 %{to: to, actor: actor, published: published, context: context, object: object},
273 additional
274 )
275
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 else
285 {:quick_insert, true, activity} ->
286 {:ok, activity}
287
288 {:fake, true, activity} ->
289 {:ok, activity}
290
291 {:error, message} ->
292 Repo.rollback(message)
293 end
294 end
295
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
302
303 listen_data =
304 make_listen_data(
305 %{to: to, actor: actor, published: published, context: context, object: object},
306 additional
307 )
308
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 end
314 end
315
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
321 result
322 end
323 end
324
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
332 {:ok, activity}
333 else
334 nil -> nil
335 {:error, error} -> Repo.rollback(error)
336 end
337 end
338
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def flag(params) do
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
342 result
343 end
344 end
345
346 defp do_flag(
347 %{
348 actor: actor,
349 context: _context,
350 account: account,
351 statuses: statuses,
352 content: content
353 } = params
354 ) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
358
359 additional = params[:additional] || %{}
360
361 additional =
362 if forward do
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
364 else
365 Map.merge(additional, %{"to" => [], "cc" => []})
366 end
367
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
372 :ok <-
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
377 superuser
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
380 end)
381
382 {:ok, activity}
383 else
384 {:error, error} -> Repo.rollback(error)
385 end
386 end
387
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
390 params = %{
391 "type" => "Move",
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
395 }
396
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
401
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
405 })
406
407 {:ok, activity}
408 else
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
410 err -> err
411 end
412 end
413
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
416
417 recipients =
418 if opts[:user],
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
420 else: public
421
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
429 |> where(
430 [activity],
431 fragment(
432 "?->>'type' = ? and ?->>'context' = ?",
433 activity.data,
434 "Create",
435 activity.data,
436 ^context
437 )
438 )
439 |> exclude_poll_votes(opts)
440 |> exclude_id(opts)
441 |> order_by([activity], desc: activity.id)
442 end
443
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
446 context
447 |> fetch_activities_for_context_query(opts)
448 |> Repo.all()
449 end
450
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
457 |> limit(1)
458 |> select([a], a.id)
459 |> Repo.one()
460 end
461
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
465
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
470 end
471
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
474 opts
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
477 end
478
479 @valid_visibilities ~w[direct unlisted public private]
480
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
484 from(
485 a in query,
486 where:
487 fragment(
488 "activity_visibility(?, ?, ?) = ANY (?)",
489 a.actor,
490 a.recipients,
491 a.data,
492 ^visibility
493 )
494 )
495 else
496 Logger.error("Could not restrict visibility to #{visibility}")
497 end
498 end
499
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
502 from(
503 a in query,
504 where:
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 )
507 end
508
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
512 end
513
514 defp restrict_visibility(query, _visibility), do: query
515
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
519 from(
520 a in query,
521 where:
522 not fragment(
523 "activity_visibility(?, ?, ?) = ANY (?)",
524 a.actor,
525 a.recipients,
526 a.data,
527 ^visibility
528 )
529 )
530 else
531 Logger.error("Could not exclude visibility to #{visibility}")
532 query
533 end
534 end
535
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
538 from(
539 a in query,
540 where:
541 not fragment(
542 "activity_visibility(?, ?, ?) = ?",
543 a.actor,
544 a.recipients,
545 a.data,
546 ^visibility
547 )
548 )
549 end
550
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
554 query
555 end
556
557 defp exclude_visibility(query, _visibility), do: query
558
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
563 do: query
564
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
566 from(
567 a in query,
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 )
570 end
571
572 defp restrict_thread_visibility(query, _, _), do: query
573
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
575 params =
576 params
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579
580 %{
581 godmode: params[:godmode],
582 reading_user: reading_user
583 }
584 |> user_activities_recipients()
585 |> fetch_activities(params)
586 |> Enum.reverse()
587 end
588
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
590 params =
591 params
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
596
597 params =
598 if User.blocks?(reading_user, user) do
599 params
600 else
601 params
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
604 end
605
606 %{
607 godmode: params[:godmode],
608 reading_user: reading_user
609 }
610 |> user_activities_recipients()
611 |> fetch_activities(params)
612 |> Enum.reverse()
613 end
614
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
617
618 %{
619 godmode: params[:godmode],
620 reading_user: reading_user
621 }
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
624 |> Enum.reverse()
625 end
626
627 defp user_activities_recipients(%{godmode: true}), do: []
628
629 defp user_activities_recipients(%{reading_user: reading_user}) do
630 if reading_user do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
632 else
633 [Constants.as_public()]
634 end
635 end
636
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
639 end
640
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
642 from(
643 [activity, object] in query,
644 where:
645 fragment(
646 "?->>'type' != ? or ?->>'actor' != ?",
647 activity.data,
648 "Announce",
649 object.data,
650 ^actor
651 )
652 )
653 end
654
655 defp restrict_announce_object_actor(query, _), do: query
656
657 defp restrict_since(query, %{since_id: ""}), do: query
658
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
661 end
662
663 defp restrict_since(query, _), do: query
664
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise "Can't use the child object without preloading!"
667 end
668
669 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
670 from(
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 )
674 end
675
676 defp restrict_tag_reject(query, _), do: query
677
678 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
679 raise "Can't use the child object without preloading!"
680 end
681
682 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
683 from(
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
686 )
687 end
688
689 defp restrict_tag_all(query, _), do: query
690
691 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
692 raise "Can't use the child object without preloading!"
693 end
694
695 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
696 from(
697 [_activity, object] in query,
698 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
699 )
700 end
701
702 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
703 from(
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
706 )
707 end
708
709 defp restrict_tag(query, _), do: query
710
711 defp restrict_recipients(query, [], _user), do: query
712
713 defp restrict_recipients(query, recipients, nil) do
714 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
715 end
716
717 defp restrict_recipients(query, recipients, user) do
718 from(
719 activity in query,
720 where: fragment("? && ?", ^recipients, activity.recipients),
721 or_where: activity.actor == ^user.ap_id
722 )
723 end
724
725 defp restrict_local(query, %{local_only: true}) do
726 from(activity in query, where: activity.local == true)
727 end
728
729 defp restrict_local(query, _), do: query
730
731 defp restrict_actor(query, %{actor_id: actor_id}) do
732 from(activity in query, where: activity.actor == ^actor_id)
733 end
734
735 defp restrict_actor(query, _), do: query
736
737 defp restrict_type(query, %{type: type}) when is_binary(type) do
738 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
739 end
740
741 defp restrict_type(query, %{type: type}) do
742 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
743 end
744
745 defp restrict_type(query, _), do: query
746
747 defp restrict_state(query, %{state: state}) do
748 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
749 end
750
751 defp restrict_state(query, _), do: query
752
753 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
754 from(
755 [_activity, object] in query,
756 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
757 )
758 end
759
760 defp restrict_favorited_by(query, _), do: query
761
762 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
763 raise "Can't use the child object without preloading!"
764 end
765
766 defp restrict_media(query, %{only_media: true}) do
767 from(
768 [activity, object] in query,
769 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
770 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
771 )
772 end
773
774 defp restrict_media(query, _), do: query
775
776 defp restrict_replies(query, %{exclude_replies: true}) do
777 from(
778 [_activity, object] in query,
779 where: fragment("?->>'inReplyTo' is null", object.data)
780 )
781 end
782
783 defp restrict_replies(query, %{
784 reply_filtering_user: %User{} = user,
785 reply_visibility: "self"
786 }) do
787 from(
788 [activity, object] in query,
789 where:
790 fragment(
791 "?->>'inReplyTo' is null OR ? = ANY(?)",
792 object.data,
793 ^user.ap_id,
794 activity.recipients
795 )
796 )
797 end
798
799 defp restrict_replies(query, %{
800 reply_filtering_user: %User{} = user,
801 reply_visibility: "following"
802 }) do
803 from(
804 [activity, object] in query,
805 where:
806 fragment(
807 """
808 ?->>'type' != 'Create' -- This isn't a Create
809 OR ?->>'inReplyTo' is null -- this isn't a reply
810 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
811 -- unless they are the author (because authors
812 -- are also part of the recipients). This leads
813 -- to a bug that self-replies by friends won't
814 -- show up.
815 OR ? = ? -- The actor is us
816 """,
817 activity.data,
818 object.data,
819 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
820 activity.recipients,
821 activity.actor,
822 activity.actor,
823 ^user.ap_id
824 )
825 )
826 end
827
828 defp restrict_replies(query, _), do: query
829
830 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
831 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
832 end
833
834 defp restrict_reblogs(query, _), do: query
835
836 defp restrict_muted(query, %{with_muted: true}), do: query
837
838 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
839 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
840
841 query =
842 from([activity] in query,
843 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
844 where:
845 fragment(
846 "not (?->'to' \\?| ?) or ? = ?",
847 activity.data,
848 ^mutes,
849 activity.actor,
850 ^user.ap_id
851 )
852 )
853
854 unless opts[:skip_preload] do
855 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
856 else
857 query
858 end
859 end
860
861 defp restrict_muted(query, _), do: query
862
863 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
864 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
865 domain_blocks = user.domain_blocks || []
866
867 following_ap_ids = User.get_friends_ap_ids(user)
868
869 query =
870 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
871
872 from(
873 [activity, object: o] in query,
874 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
875 where:
876 fragment(
877 "((not (? && ?)) or ? = ?)",
878 activity.recipients,
879 ^blocked_ap_ids,
880 activity.actor,
881 ^user.ap_id
882 ),
883 where:
884 fragment(
885 "recipients_contain_blocked_domains(?, ?) = false",
886 activity.recipients,
887 ^domain_blocks
888 ),
889 where:
890 fragment(
891 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
892 activity.data,
893 activity.data,
894 ^blocked_ap_ids
895 ),
896 where:
897 fragment(
898 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
899 activity.actor,
900 ^domain_blocks,
901 activity.actor,
902 ^following_ap_ids
903 ),
904 where:
905 fragment(
906 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
907 o.data,
908 ^domain_blocks,
909 o.data,
910 ^following_ap_ids
911 )
912 )
913 end
914
915 defp restrict_blocked(query, _), do: query
916
917 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
918 from(
919 activity in query,
920 where:
921 fragment(
922 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
923 activity.data,
924 ^[Constants.as_public()]
925 )
926 )
927 end
928
929 defp restrict_unlisted(query, _), do: query
930
931 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
932 from(activity in query, where: activity.id in ^ids)
933 end
934
935 defp restrict_pinned(query, _), do: query
936
937 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
938 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
939
940 from(
941 activity in query,
942 where:
943 fragment(
944 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
945 activity.data,
946 activity.actor,
947 ^muted_reblogs
948 )
949 )
950 end
951
952 defp restrict_muted_reblogs(query, _), do: query
953
954 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
955 from(
956 activity in query,
957 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
958 )
959 end
960
961 defp restrict_instance(query, _), do: query
962
963 defp restrict_filtered(query, %{user: %User{} = user}) do
964 case Filter.compose_regex(user) do
965 nil ->
966 query
967
968 regex ->
969 from([activity, object] in query,
970 where:
971 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
972 activity.actor == ^user.ap_id
973 )
974 end
975 end
976
977 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
978 restrict_filtered(query, %{user: user})
979 end
980
981 defp restrict_filtered(query, _), do: query
982
983 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
984
985 defp exclude_poll_votes(query, _) do
986 if has_named_binding?(query, :object) do
987 from([activity, object: o] in query,
988 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
989 )
990 else
991 query
992 end
993 end
994
995 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
996
997 defp exclude_chat_messages(query, _) do
998 if has_named_binding?(query, :object) do
999 from([activity, object: o] in query,
1000 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1001 )
1002 else
1003 query
1004 end
1005 end
1006
1007 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1008
1009 defp exclude_invisible_actors(query, _opts) do
1010 invisible_ap_ids =
1011 User.Query.build(%{invisible: true, select: [:ap_id]})
1012 |> Repo.all()
1013 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1014
1015 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1016 end
1017
1018 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1019 from(activity in query, where: activity.id != ^id)
1020 end
1021
1022 defp exclude_id(query, _), do: query
1023
1024 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1025
1026 defp maybe_preload_objects(query, _) do
1027 query
1028 |> Activity.with_preloaded_object()
1029 end
1030
1031 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1032
1033 defp maybe_preload_bookmarks(query, opts) do
1034 query
1035 |> Activity.with_preloaded_bookmark(opts[:user])
1036 end
1037
1038 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1039 query
1040 |> Activity.with_preloaded_report_notes()
1041 end
1042
1043 defp maybe_preload_report_notes(query, _), do: query
1044
1045 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1046
1047 defp maybe_set_thread_muted_field(query, opts) do
1048 query
1049 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1050 end
1051
1052 defp maybe_order(query, %{order: :desc}) do
1053 query
1054 |> order_by(desc: :id)
1055 end
1056
1057 defp maybe_order(query, %{order: :asc}) do
1058 query
1059 |> order_by(asc: :id)
1060 end
1061
1062 defp maybe_order(query, _), do: query
1063
1064 defp fetch_activities_query_ap_ids_ops(opts) do
1065 source_user = opts[:muting_user]
1066 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1067
1068 ap_id_relationships =
1069 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1070 [:block | ap_id_relationships]
1071 else
1072 ap_id_relationships
1073 end
1074
1075 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1076
1077 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1078 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1079
1080 restrict_muted_reblogs_opts =
1081 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1082
1083 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1084 end
1085
1086 def fetch_activities_query(recipients, opts \\ %{}) do
1087 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1088 fetch_activities_query_ap_ids_ops(opts)
1089
1090 config = %{
1091 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1092 }
1093
1094 Activity
1095 |> maybe_preload_objects(opts)
1096 |> maybe_preload_bookmarks(opts)
1097 |> maybe_preload_report_notes(opts)
1098 |> maybe_set_thread_muted_field(opts)
1099 |> maybe_order(opts)
1100 |> restrict_recipients(recipients, opts[:user])
1101 |> restrict_replies(opts)
1102 |> restrict_tag(opts)
1103 |> restrict_tag_reject(opts)
1104 |> restrict_tag_all(opts)
1105 |> restrict_since(opts)
1106 |> restrict_local(opts)
1107 |> restrict_actor(opts)
1108 |> restrict_type(opts)
1109 |> restrict_state(opts)
1110 |> restrict_favorited_by(opts)
1111 |> restrict_blocked(restrict_blocked_opts)
1112 |> restrict_muted(restrict_muted_opts)
1113 |> restrict_filtered(opts)
1114 |> restrict_media(opts)
1115 |> restrict_visibility(opts)
1116 |> restrict_thread_visibility(opts, config)
1117 |> restrict_reblogs(opts)
1118 |> restrict_pinned(opts)
1119 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1120 |> restrict_instance(opts)
1121 |> restrict_announce_object_actor(opts)
1122 |> restrict_filtered(opts)
1123 |> Activity.restrict_deactivated_users()
1124 |> exclude_poll_votes(opts)
1125 |> exclude_chat_messages(opts)
1126 |> exclude_invisible_actors(opts)
1127 |> exclude_visibility(opts)
1128 end
1129
1130 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1131 list_memberships = Pleroma.List.memberships(opts[:user])
1132
1133 fetch_activities_query(recipients ++ list_memberships, opts)
1134 |> Pagination.fetch_paginated(opts, pagination)
1135 |> Enum.reverse()
1136 |> maybe_update_cc(list_memberships, opts[:user])
1137 end
1138
1139 @doc """
1140 Fetch favorites activities of user with order by sort adds to favorites
1141 """
1142 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1143 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1144 user.ap_id
1145 |> Activity.Queries.by_actor()
1146 |> Activity.Queries.by_type("Like")
1147 |> Activity.with_joined_object()
1148 |> Object.with_joined_activity()
1149 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1150 |> order_by([like, _, _], desc_nulls_last: like.id)
1151 |> Pagination.fetch_paginated(
1152 Map.merge(params, %{skip_order: true}),
1153 pagination
1154 )
1155 end
1156
1157 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1158 Enum.map(activities, fn
1159 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1160 if Enum.any?(bcc, &(&1 in list_memberships)) do
1161 update_in(activity.data["cc"], &[user_ap_id | &1])
1162 else
1163 activity
1164 end
1165
1166 activity ->
1167 activity
1168 end)
1169 end
1170
1171 defp maybe_update_cc(activities, _, _), do: activities
1172
1173 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1174 from(activity in query,
1175 where:
1176 fragment("? && ?", activity.recipients, ^recipients) or
1177 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1178 ^Constants.as_public() in activity.recipients)
1179 )
1180 end
1181
1182 def fetch_activities_bounded(
1183 recipients,
1184 recipients_with_public,
1185 opts \\ %{},
1186 pagination \\ :keyset
1187 ) do
1188 fetch_activities_query([], opts)
1189 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1190 |> Pagination.fetch_paginated(opts, pagination)
1191 |> Enum.reverse()
1192 end
1193
1194 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1195 def upload(file, opts \\ []) do
1196 with {:ok, data} <- Upload.store(file, opts) do
1197 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1198
1199 Repo.insert(%Object{data: obj_data})
1200 end
1201 end
1202
1203 @spec get_actor_url(any()) :: binary() | nil
1204 defp get_actor_url(url) when is_binary(url), do: url
1205 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1206
1207 defp get_actor_url(url) when is_list(url) do
1208 url
1209 |> List.first()
1210 |> get_actor_url()
1211 end
1212
1213 defp get_actor_url(_url), do: nil
1214
1215 defp object_to_user_data(data) do
1216 avatar =
1217 data["icon"]["url"] &&
1218 %{
1219 "type" => "Image",
1220 "url" => [%{"href" => data["icon"]["url"]}]
1221 }
1222
1223 banner =
1224 data["image"]["url"] &&
1225 %{
1226 "type" => "Image",
1227 "url" => [%{"href" => data["image"]["url"]}]
1228 }
1229
1230 fields =
1231 data
1232 |> Map.get("attachment", [])
1233 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1234 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1235
1236 emojis =
1237 data
1238 |> Map.get("tag", [])
1239 |> Enum.filter(fn
1240 %{"type" => "Emoji"} -> true
1241 _ -> false
1242 end)
1243 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1244 {String.trim(name, ":"), url}
1245 end)
1246
1247 is_locked = data["manuallyApprovesFollowers"] || false
1248 capabilities = data["capabilities"] || %{}
1249 accepts_chat_messages = capabilities["acceptsChatMessages"]
1250 data = Transmogrifier.maybe_fix_user_object(data)
1251 is_discoverable = data["discoverable"] || false
1252 invisible = data["invisible"] || false
1253 actor_type = data["type"] || "Person"
1254
1255 public_key =
1256 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1257 data["publicKey"]["publicKeyPem"]
1258 else
1259 nil
1260 end
1261
1262 shared_inbox =
1263 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1264 data["endpoints"]["sharedInbox"]
1265 else
1266 nil
1267 end
1268
1269 user_data = %{
1270 ap_id: data["id"],
1271 uri: get_actor_url(data["url"]),
1272 ap_enabled: true,
1273 banner: banner,
1274 fields: fields,
1275 emoji: emojis,
1276 is_locked: is_locked,
1277 is_discoverable: is_discoverable,
1278 invisible: invisible,
1279 avatar: avatar,
1280 name: data["name"],
1281 follower_address: data["followers"],
1282 following_address: data["following"],
1283 bio: data["summary"] || "",
1284 actor_type: actor_type,
1285 also_known_as: Map.get(data, "alsoKnownAs", []),
1286 public_key: public_key,
1287 inbox: data["inbox"],
1288 shared_inbox: shared_inbox,
1289 accepts_chat_messages: accepts_chat_messages
1290 }
1291
1292 # nickname can be nil because of virtual actors
1293 if data["preferredUsername"] do
1294 Map.put(
1295 user_data,
1296 :nickname,
1297 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1298 )
1299 else
1300 Map.put(user_data, :nickname, nil)
1301 end
1302 end
1303
1304 def fetch_follow_information_for_user(user) do
1305 with {:ok, following_data} <-
1306 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1307 {:ok, hide_follows} <- collection_private(following_data),
1308 {:ok, followers_data} <-
1309 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1310 {:ok, hide_followers} <- collection_private(followers_data) do
1311 {:ok,
1312 %{
1313 hide_follows: hide_follows,
1314 follower_count: normalize_counter(followers_data["totalItems"]),
1315 following_count: normalize_counter(following_data["totalItems"]),
1316 hide_followers: hide_followers
1317 }}
1318 else
1319 {:error, _} = e -> e
1320 e -> {:error, e}
1321 end
1322 end
1323
1324 defp normalize_counter(counter) when is_integer(counter), do: counter
1325 defp normalize_counter(_), do: 0
1326
1327 def maybe_update_follow_information(user_data) do
1328 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1329 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1330 {_, true} <-
1331 {:collections_available,
1332 !!(user_data[:following_address] && user_data[:follower_address])},
1333 {:ok, info} <-
1334 fetch_follow_information_for_user(user_data) do
1335 info = Map.merge(user_data[:info] || %{}, info)
1336
1337 user_data
1338 |> Map.put(:info, info)
1339 else
1340 {:user_type_check, false} ->
1341 user_data
1342
1343 {:collections_available, false} ->
1344 user_data
1345
1346 {:enabled, false} ->
1347 user_data
1348
1349 e ->
1350 Logger.error(
1351 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1352 )
1353
1354 user_data
1355 end
1356 end
1357
1358 defp collection_private(%{"first" => %{"type" => type}})
1359 when type in ["CollectionPage", "OrderedCollectionPage"],
1360 do: {:ok, false}
1361
1362 defp collection_private(%{"first" => first}) do
1363 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1364 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1365 {:ok, false}
1366 else
1367 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1368 {:error, _} = e -> e
1369 e -> {:error, e}
1370 end
1371 end
1372
1373 defp collection_private(_data), do: {:ok, true}
1374
1375 def user_data_from_user_object(data) do
1376 with {:ok, data} <- MRF.filter(data) do
1377 {:ok, object_to_user_data(data)}
1378 else
1379 e -> {:error, e}
1380 end
1381 end
1382
1383 def fetch_and_prepare_user_from_ap_id(ap_id) do
1384 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1385 {:ok, data} <- user_data_from_user_object(data) do
1386 {:ok, maybe_update_follow_information(data)}
1387 else
1388 # If this has been deleted, only log a debug and not an error
1389 {:error, "Object has been deleted" = e} ->
1390 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1391 {:error, e}
1392
1393 {:error, {:reject, reason} = e} ->
1394 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1395 {:error, e}
1396
1397 {:error, e} ->
1398 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1399 {:error, e}
1400 end
1401 end
1402
1403 def maybe_handle_clashing_nickname(data) do
1404 with nickname when is_binary(nickname) <- data[:nickname],
1405 %User{} = old_user <- User.get_by_nickname(nickname),
1406 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1407 Logger.info(
1408 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1409 data[:ap_id]
1410 }, renaming."
1411 )
1412
1413 old_user
1414 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1415 |> User.update_and_set_cache()
1416 else
1417 {:ap_id_comparison, true} ->
1418 Logger.info(
1419 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1420 )
1421
1422 _ ->
1423 nil
1424 end
1425 end
1426
1427 def make_user_from_ap_id(ap_id) do
1428 user = User.get_cached_by_ap_id(ap_id)
1429
1430 if user && !User.ap_enabled?(user) do
1431 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1432 else
1433 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1434 if user do
1435 user
1436 |> User.remote_user_changeset(data)
1437 |> User.update_and_set_cache()
1438 else
1439 maybe_handle_clashing_nickname(data)
1440
1441 data
1442 |> User.remote_user_changeset()
1443 |> Repo.insert()
1444 |> User.set_cache()
1445 end
1446 end
1447 end
1448 end
1449
1450 def make_user_from_nickname(nickname) do
1451 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1452 make_user_from_ap_id(ap_id)
1453 else
1454 _e -> {:error, "No AP id in WebFinger"}
1455 end
1456 end
1457
1458 # filter out broken threads
1459 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1460 entire_thread_visible_for_user?(activity, user)
1461 end
1462
1463 # do post-processing on a specific activity
1464 def contain_activity(%Activity{} = activity, %User{} = user) do
1465 contain_broken_threads(activity, user)
1466 end
1467
1468 def fetch_direct_messages_query do
1469 Activity
1470 |> restrict_type(%{type: "Create"})
1471 |> restrict_visibility(%{visibility: "direct"})
1472 |> order_by([activity], asc: activity.id)
1473 end
1474 end