0f839af104c7390cb59c2fa7b1bfe11ea80fcd37
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub.Persisting do
6 @callback persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
7 end
8
9 defmodule Pleroma.Web.ActivityPub.ActivityPub do
10 alias Pleroma.Activity
11 alias Pleroma.Activity.Ir.Topics
12 alias Pleroma.Config
13 alias Pleroma.Constants
14 alias Pleroma.Conversation
15 alias Pleroma.Conversation.Participation
16 alias Pleroma.Filter
17 alias Pleroma.Maps
18 alias Pleroma.Notification
19 alias Pleroma.Object
20 alias Pleroma.Object.Containment
21 alias Pleroma.Object.Fetcher
22 alias Pleroma.Pagination
23 alias Pleroma.Repo
24 alias Pleroma.Upload
25 alias Pleroma.User
26 alias Pleroma.Web.ActivityPub.MRF
27 alias Pleroma.Web.ActivityPub.Transmogrifier
28 alias Pleroma.Web.Streamer
29 alias Pleroma.Web.WebFinger
30 alias Pleroma.Workers.BackgroundWorker
31
32 import Ecto.Query
33 import Pleroma.Web.ActivityPub.Utils
34 import Pleroma.Web.ActivityPub.Visibility
35
36 require Logger
37 require Pleroma.Constants
38
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_is_active(nil), do: true
59
60 defp check_actor_is_active(actor) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{deactivated: deactivated} -> not deactivated
63 _ -> false
64 end
65 end
66
67 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
68 limit = Config.get([:instance, :remote_limit])
69 String.length(content) <= limit
70 end
71
72 defp check_remote_limit(_), do: true
73
74 def increase_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
76 end
77
78 def decrease_note_count_if_public(actor, object) do
79 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
80 end
81
82 defp increase_replies_count_if_reply(%{
83 "object" => %{"inReplyTo" => reply_ap_id} = object,
84 "type" => "Create"
85 }) do
86 if is_public?(object) do
87 Object.increase_replies_count(reply_ap_id)
88 end
89 end
90
91 defp increase_replies_count_if_reply(_create_data), do: :noop
92
93 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
94 @impl true
95 def persist(%{"type" => type} = object, meta) when type in @object_types do
96 with {:ok, object} <- Object.create(object) do
97 {:ok, object, meta}
98 end
99 end
100
101 @impl true
102 def persist(object, meta) do
103 with local <- Keyword.fetch!(meta, :local),
104 {recipients, _, _} <- get_recipients(object),
105 {:ok, activity} <-
106 Repo.insert(%Activity{
107 data: object,
108 local: local,
109 recipients: recipients,
110 actor: object["actor"]
111 }),
112 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
113 {:ok, _} <- maybe_create_activity_expiration(activity) do
114 {:ok, activity, meta}
115 end
116 end
117
118 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
119 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
120 with nil <- Activity.normalize(map),
121 map <- lazy_put_activity_defaults(map, fake),
122 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
123 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
124 {:ok, map} <- MRF.filter(map),
125 {recipients, _, _} = get_recipients(map),
126 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
127 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
128 {:ok, map, object} <- insert_full_object(map),
129 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
130 # Splice in the child object if we have one.
131 activity = Maps.put_if_present(activity, :object, object)
132
133 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
134 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
135 end)
136
137 {:ok, activity}
138 else
139 %Activity{} = activity ->
140 {:ok, activity}
141
142 {:actor_check, _} ->
143 {:error, false}
144
145 {:containment, _} = error ->
146 error
147
148 {:error, _} = error ->
149 error
150
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
153 data: map,
154 local: local,
155 actor: map["actor"],
156 recipients: recipients,
157 id: "pleroma:fakeid"
158 }
159
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:ok, activity}
162
163 {:remote_limit_pass, _} ->
164 {:error, :remote_limit}
165
166 {:reject, _} = e ->
167 {:error, e}
168 end
169 end
170
171 defp insert_activity_with_expiration(data, local, recipients) do
172 struct = %Activity{
173 data: data,
174 local: local,
175 actor: data["actor"],
176 recipients: recipients
177 }
178
179 with {:ok, activity} <- Repo.insert(struct) do
180 maybe_create_activity_expiration(activity)
181 end
182 end
183
184 def notify_and_stream(activity) do
185 Notification.create_notifications(activity)
186
187 conversation = create_or_bump_conversation(activity, activity.actor)
188 participations = get_participations(conversation)
189 stream_out(activity)
190 stream_out_participations(participations)
191 end
192
193 defp maybe_create_activity_expiration(
194 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
195 ) do
196 with {:ok, _job} <-
197 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
198 activity_id: activity.id,
199 expires_at: expires_at
200 }) do
201 {:ok, activity}
202 end
203 end
204
205 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
206
207 defp create_or_bump_conversation(activity, actor) do
208 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
209 %User{} = user <- User.get_cached_by_ap_id(actor) do
210 Participation.mark_as_read(user, conversation)
211 {:ok, conversation}
212 end
213 end
214
215 defp get_participations({:ok, conversation}) do
216 conversation
217 |> Repo.preload(:participations, force: true)
218 |> Map.get(:participations)
219 end
220
221 defp get_participations(_), do: []
222
223 def stream_out_participations(participations) do
224 participations =
225 participations
226 |> Repo.preload(:user)
227
228 Streamer.stream("participation", participations)
229 end
230
231 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
232 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
233 conversation = Repo.preload(conversation, :participations)
234
235 last_activity_id =
236 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
237 user: user,
238 blocking_user: user
239 })
240
241 if last_activity_id do
242 stream_out_participations(conversation.participations)
243 end
244 end
245 end
246
247 def stream_out_participations(_, _), do: :noop
248
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 def stream_out(_activity) do
257 :noop
258 end
259
260 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
261 def create(params, fake \\ false) do
262 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
263 result
264 end
265 end
266
267 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
268 additional = params[:additional] || %{}
269 # only accept false as false value
270 local = !(params[:local] == false)
271 published = params[:published]
272 quick_insert? = Config.get([:env]) == :benchmark
273
274 create_data =
275 make_create_data(
276 %{to: to, actor: actor, published: published, context: context, object: object},
277 additional
278 )
279
280 with {:ok, activity} <- insert(create_data, local, fake),
281 {:fake, false, activity} <- {:fake, fake, activity},
282 _ <- increase_replies_count_if_reply(create_data),
283 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
284 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
285 _ <- notify_and_stream(activity),
286 :ok <- maybe_federate(activity) do
287 {:ok, activity}
288 else
289 {:quick_insert, true, activity} ->
290 {:ok, activity}
291
292 {:fake, true, activity} ->
293 {:ok, activity}
294
295 {:error, message} ->
296 Repo.rollback(message)
297 end
298 end
299
300 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
301 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
302 additional = params[:additional] || %{}
303 # only accept false as false value
304 local = !(params[:local] == false)
305 published = params[:published]
306
307 listen_data =
308 make_listen_data(
309 %{to: to, actor: actor, published: published, context: context, object: object},
310 additional
311 )
312
313 with {:ok, activity} <- insert(listen_data, local),
314 _ <- notify_and_stream(activity),
315 :ok <- maybe_federate(activity) do
316 {:ok, activity}
317 end
318 end
319
320 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
321 {:ok, Activity.t()} | nil | {:error, any()}
322 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
323 with {:ok, result} <-
324 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
325 result
326 end
327 end
328
329 defp do_unfollow(follower, followed, activity_id, local) do
330 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
331 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
332 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
333 {:ok, activity} <- insert(unfollow_data, local),
334 _ <- notify_and_stream(activity),
335 :ok <- maybe_federate(activity) do
336 {:ok, activity}
337 else
338 nil -> nil
339 {:error, error} -> Repo.rollback(error)
340 end
341 end
342
343 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
344 def flag(params) do
345 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
346 result
347 end
348 end
349
350 defp do_flag(
351 %{
352 actor: actor,
353 context: _context,
354 account: account,
355 statuses: statuses,
356 content: content
357 } = params
358 ) do
359 # only accept false as false value
360 local = !(params[:local] == false)
361 forward = !(params[:forward] == false)
362
363 additional = params[:additional] || %{}
364
365 additional =
366 if forward do
367 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
368 else
369 Map.merge(additional, %{"to" => [], "cc" => []})
370 end
371
372 with flag_data <- make_flag_data(params, additional),
373 {:ok, activity} <- insert(flag_data, local),
374 {:ok, stripped_activity} <- strip_report_status_data(activity),
375 _ <- notify_and_stream(activity),
376 :ok <-
377 maybe_federate(stripped_activity) do
378 User.all_superusers()
379 |> Enum.filter(fn user -> not is_nil(user.email) end)
380 |> Enum.each(fn superuser ->
381 superuser
382 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
383 |> Pleroma.Emails.Mailer.deliver_async()
384 end)
385
386 {:ok, activity}
387 else
388 {:error, error} -> Repo.rollback(error)
389 end
390 end
391
392 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
393 def move(%User{} = origin, %User{} = target, local \\ true) do
394 params = %{
395 "type" => "Move",
396 "actor" => origin.ap_id,
397 "object" => origin.ap_id,
398 "target" => target.ap_id
399 }
400
401 with true <- origin.ap_id in target.also_known_as,
402 {:ok, activity} <- insert(params, local),
403 _ <- notify_and_stream(activity) do
404 maybe_federate(activity)
405
406 BackgroundWorker.enqueue("move_following", %{
407 "origin_id" => origin.id,
408 "target_id" => target.id
409 })
410
411 {:ok, activity}
412 else
413 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
414 err -> err
415 end
416 end
417
418 def fetch_activities_for_context_query(context, opts) do
419 public = [Constants.as_public()]
420
421 recipients =
422 if opts[:user],
423 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
424 else: public
425
426 from(activity in Activity)
427 |> maybe_preload_objects(opts)
428 |> maybe_preload_bookmarks(opts)
429 |> maybe_set_thread_muted_field(opts)
430 |> restrict_blocked(opts)
431 |> restrict_recipients(recipients, opts[:user])
432 |> restrict_filtered(opts)
433 |> where(
434 [activity],
435 fragment(
436 "?->>'type' = ? and ?->>'context' = ?",
437 activity.data,
438 "Create",
439 activity.data,
440 ^context
441 )
442 )
443 |> exclude_poll_votes(opts)
444 |> exclude_id(opts)
445 |> order_by([activity], desc: activity.id)
446 end
447
448 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
449 def fetch_activities_for_context(context, opts \\ %{}) do
450 context
451 |> fetch_activities_for_context_query(opts)
452 |> Repo.all()
453 end
454
455 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
456 FlakeId.Ecto.CompatType.t() | nil
457 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
458 context
459 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
460 |> restrict_visibility(%{visibility: "direct"})
461 |> limit(1)
462 |> select([a], a.id)
463 |> Repo.one()
464 end
465
466 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
467 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
468 opts = Map.delete(opts, :user)
469
470 [Constants.as_public()]
471 |> fetch_activities_query(opts)
472 |> restrict_unlisted(opts)
473 |> Pagination.fetch_paginated(opts, pagination)
474 end
475
476 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
477 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
478 opts
479 |> Map.put(:restrict_unlisted, true)
480 |> fetch_public_or_unlisted_activities(pagination)
481 end
482
483 @valid_visibilities ~w[direct unlisted public private]
484
485 defp restrict_visibility(query, %{visibility: visibility})
486 when is_list(visibility) do
487 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
488 from(
489 a in query,
490 where:
491 fragment(
492 "activity_visibility(?, ?, ?) = ANY (?)",
493 a.actor,
494 a.recipients,
495 a.data,
496 ^visibility
497 )
498 )
499 else
500 Logger.error("Could not restrict visibility to #{visibility}")
501 end
502 end
503
504 defp restrict_visibility(query, %{visibility: visibility})
505 when visibility in @valid_visibilities do
506 from(
507 a in query,
508 where:
509 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
510 )
511 end
512
513 defp restrict_visibility(_query, %{visibility: visibility})
514 when visibility not in @valid_visibilities do
515 Logger.error("Could not restrict visibility to #{visibility}")
516 end
517
518 defp restrict_visibility(query, _visibility), do: query
519
520 defp exclude_visibility(query, %{exclude_visibilities: visibility})
521 when is_list(visibility) do
522 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
523 from(
524 a in query,
525 where:
526 not fragment(
527 "activity_visibility(?, ?, ?) = ANY (?)",
528 a.actor,
529 a.recipients,
530 a.data,
531 ^visibility
532 )
533 )
534 else
535 Logger.error("Could not exclude visibility to #{visibility}")
536 query
537 end
538 end
539
540 defp exclude_visibility(query, %{exclude_visibilities: visibility})
541 when visibility in @valid_visibilities do
542 from(
543 a in query,
544 where:
545 not fragment(
546 "activity_visibility(?, ?, ?) = ?",
547 a.actor,
548 a.recipients,
549 a.data,
550 ^visibility
551 )
552 )
553 end
554
555 defp exclude_visibility(query, %{exclude_visibilities: visibility})
556 when visibility not in [nil | @valid_visibilities] do
557 Logger.error("Could not exclude visibility to #{visibility}")
558 query
559 end
560
561 defp exclude_visibility(query, _visibility), do: query
562
563 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
564 do: query
565
566 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
567 do: query
568
569 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
570 from(
571 a in query,
572 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
573 )
574 end
575
576 defp restrict_thread_visibility(query, _, _), do: query
577
578 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
579 params =
580 params
581 |> Map.put(:user, reading_user)
582 |> Map.put(:actor_id, user.ap_id)
583
584 %{
585 godmode: params[:godmode],
586 reading_user: reading_user
587 }
588 |> user_activities_recipients()
589 |> fetch_activities(params)
590 |> Enum.reverse()
591 end
592
593 def fetch_user_activities(user, reading_user, params \\ %{}) do
594 params =
595 params
596 |> Map.put(:type, ["Create", "Announce"])
597 |> Map.put(:user, reading_user)
598 |> Map.put(:actor_id, user.ap_id)
599 |> Map.put(:pinned_activity_ids, user.pinned_activities)
600
601 params =
602 if User.blocks?(reading_user, user) do
603 params
604 else
605 params
606 |> Map.put(:blocking_user, reading_user)
607 |> Map.put(:muting_user, reading_user)
608 end
609
610 %{
611 godmode: params[:godmode],
612 reading_user: reading_user
613 }
614 |> user_activities_recipients()
615 |> fetch_activities(params)
616 |> Enum.reverse()
617 end
618
619 def fetch_statuses(reading_user, params) do
620 params = Map.put(params, :type, ["Create", "Announce"])
621
622 %{
623 godmode: params[:godmode],
624 reading_user: reading_user
625 }
626 |> user_activities_recipients()
627 |> fetch_activities(params, :offset)
628 |> Enum.reverse()
629 end
630
631 defp user_activities_recipients(%{godmode: true}), do: []
632
633 defp user_activities_recipients(%{reading_user: reading_user}) do
634 if reading_user do
635 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
636 else
637 [Constants.as_public()]
638 end
639 end
640
641 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
642 raise "Can't use the child object without preloading!"
643 end
644
645 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
646 from(
647 [activity, object] in query,
648 where:
649 fragment(
650 "?->>'type' != ? or ?->>'actor' != ?",
651 activity.data,
652 "Announce",
653 object.data,
654 ^actor
655 )
656 )
657 end
658
659 defp restrict_announce_object_actor(query, _), do: query
660
661 defp restrict_since(query, %{since_id: ""}), do: query
662
663 defp restrict_since(query, %{since_id: since_id}) do
664 from(activity in query, where: activity.id > ^since_id)
665 end
666
667 defp restrict_since(query, _), do: query
668
669 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
670 raise "Can't use the child object without preloading!"
671 end
672
673 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
674 from(
675 [_activity, object] in query,
676 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
677 )
678 end
679
680 defp restrict_tag_reject(query, _), do: query
681
682 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
683 raise "Can't use the child object without preloading!"
684 end
685
686 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
687 from(
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
690 )
691 end
692
693 defp restrict_tag_all(query, _), do: query
694
695 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
696 raise "Can't use the child object without preloading!"
697 end
698
699 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
700 from(
701 [_activity, object] in query,
702 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
703 )
704 end
705
706 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
707 from(
708 [_activity, object] in query,
709 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
710 )
711 end
712
713 defp restrict_tag(query, _), do: query
714
715 defp restrict_recipients(query, [], _user), do: query
716
717 defp restrict_recipients(query, recipients, nil) do
718 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
719 end
720
721 defp restrict_recipients(query, recipients, user) do
722 from(
723 activity in query,
724 where: fragment("? && ?", ^recipients, activity.recipients),
725 or_where: activity.actor == ^user.ap_id
726 )
727 end
728
729 defp restrict_local(query, %{local_only: true}) do
730 from(activity in query, where: activity.local == true)
731 end
732
733 defp restrict_local(query, _), do: query
734
735 defp restrict_actor(query, %{actor_id: actor_id}) do
736 from(activity in query, where: activity.actor == ^actor_id)
737 end
738
739 defp restrict_actor(query, _), do: query
740
741 defp restrict_type(query, %{type: type}) when is_binary(type) do
742 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
743 end
744
745 defp restrict_type(query, %{type: type}) do
746 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
747 end
748
749 defp restrict_type(query, _), do: query
750
751 defp restrict_state(query, %{state: state}) do
752 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
753 end
754
755 defp restrict_state(query, _), do: query
756
757 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
758 from(
759 [_activity, object] in query,
760 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
761 )
762 end
763
764 defp restrict_favorited_by(query, _), do: query
765
766 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
767 raise "Can't use the child object without preloading!"
768 end
769
770 defp restrict_media(query, %{only_media: true}) do
771 from(
772 [activity, object] in query,
773 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
774 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
775 )
776 end
777
778 defp restrict_media(query, _), do: query
779
780 defp restrict_replies(query, %{exclude_replies: true}) do
781 from(
782 [_activity, object] in query,
783 where: fragment("?->>'inReplyTo' is null", object.data)
784 )
785 end
786
787 defp restrict_replies(query, %{
788 reply_filtering_user: %User{} = user,
789 reply_visibility: "self"
790 }) do
791 from(
792 [activity, object] in query,
793 where:
794 fragment(
795 "?->>'inReplyTo' is null OR ? = ANY(?)",
796 object.data,
797 ^user.ap_id,
798 activity.recipients
799 )
800 )
801 end
802
803 defp restrict_replies(query, %{
804 reply_filtering_user: %User{} = user,
805 reply_visibility: "following"
806 }) do
807 from(
808 [activity, object] in query,
809 where:
810 fragment(
811 """
812 ?->>'type' != 'Create' -- This isn't a Create
813 OR ?->>'inReplyTo' is null -- this isn't a reply
814 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
815 -- unless they are the author (because authors
816 -- are also part of the recipients). This leads
817 -- to a bug that self-replies by friends won't
818 -- show up.
819 OR ? = ? -- The actor is us
820 """,
821 activity.data,
822 object.data,
823 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
824 activity.recipients,
825 activity.actor,
826 activity.actor,
827 ^user.ap_id
828 )
829 )
830 end
831
832 defp restrict_replies(query, _), do: query
833
834 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
835 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
836 end
837
838 defp restrict_reblogs(query, _), do: query
839
840 defp restrict_muted(query, %{with_muted: true}), do: query
841
842 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
843 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
844
845 query =
846 from([activity] in query,
847 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
848 where:
849 fragment(
850 "not (?->'to' \\?| ?) or ? = ?",
851 activity.data,
852 ^mutes,
853 activity.actor,
854 ^user.ap_id
855 )
856 )
857
858 unless opts[:skip_preload] do
859 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
860 else
861 query
862 end
863 end
864
865 defp restrict_muted(query, _), do: query
866
867 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
868 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
869 domain_blocks = user.domain_blocks || []
870
871 following_ap_ids = User.get_friends_ap_ids(user)
872
873 query =
874 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
875
876 from(
877 [activity, object: o] in query,
878 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
879 where:
880 fragment(
881 "((not (? && ?)) or ? = ?)",
882 activity.recipients,
883 ^blocked_ap_ids,
884 activity.actor,
885 ^user.ap_id
886 ),
887 where:
888 fragment(
889 "recipients_contain_blocked_domains(?, ?) = false",
890 activity.recipients,
891 ^domain_blocks
892 ),
893 where:
894 fragment(
895 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
896 activity.data,
897 activity.data,
898 ^blocked_ap_ids
899 ),
900 where:
901 fragment(
902 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
903 activity.actor,
904 ^domain_blocks,
905 activity.actor,
906 ^following_ap_ids
907 ),
908 where:
909 fragment(
910 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
911 o.data,
912 ^domain_blocks,
913 o.data,
914 ^following_ap_ids
915 )
916 )
917 end
918
919 defp restrict_blocked(query, _), do: query
920
921 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
922 from(
923 activity in query,
924 where:
925 fragment(
926 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
927 activity.data,
928 ^[Constants.as_public()]
929 )
930 )
931 end
932
933 defp restrict_unlisted(query, _), do: query
934
935 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
936 from(activity in query, where: activity.id in ^ids)
937 end
938
939 defp restrict_pinned(query, _), do: query
940
941 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
942 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
943
944 from(
945 activity in query,
946 where:
947 fragment(
948 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
949 activity.data,
950 activity.actor,
951 ^muted_reblogs
952 )
953 )
954 end
955
956 defp restrict_muted_reblogs(query, _), do: query
957
958 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
959 from(
960 activity in query,
961 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
962 )
963 end
964
965 defp restrict_instance(query, _), do: query
966
967 defp restrict_filtered(query, %{user: %User{} = user}) do
968 case Filter.compose_regex(user) do
969 nil ->
970 query
971
972 regex ->
973 from([activity, object] in query,
974 where:
975 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
976 activity.actor == ^user.ap_id
977 )
978 end
979 end
980
981 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
982 restrict_filtered(query, %{user: user})
983 end
984
985 defp restrict_filtered(query, _), do: query
986
987 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
988
989 defp exclude_poll_votes(query, _) do
990 if has_named_binding?(query, :object) do
991 from([activity, object: o] in query,
992 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
993 )
994 else
995 query
996 end
997 end
998
999 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1000
1001 defp exclude_chat_messages(query, _) do
1002 if has_named_binding?(query, :object) do
1003 from([activity, object: o] in query,
1004 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1005 )
1006 else
1007 query
1008 end
1009 end
1010
1011 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1012
1013 defp exclude_invisible_actors(query, _opts) do
1014 invisible_ap_ids =
1015 User.Query.build(%{invisible: true, select: [:ap_id]})
1016 |> Repo.all()
1017 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1018
1019 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1020 end
1021
1022 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1023 from(activity in query, where: activity.id != ^id)
1024 end
1025
1026 defp exclude_id(query, _), do: query
1027
1028 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1029
1030 defp maybe_preload_objects(query, _) do
1031 query
1032 |> Activity.with_preloaded_object()
1033 end
1034
1035 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1036
1037 defp maybe_preload_bookmarks(query, opts) do
1038 query
1039 |> Activity.with_preloaded_bookmark(opts[:user])
1040 end
1041
1042 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1043 query
1044 |> Activity.with_preloaded_report_notes()
1045 end
1046
1047 defp maybe_preload_report_notes(query, _), do: query
1048
1049 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1050
1051 defp maybe_set_thread_muted_field(query, opts) do
1052 query
1053 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1054 end
1055
1056 defp maybe_order(query, %{order: :desc}) do
1057 query
1058 |> order_by(desc: :id)
1059 end
1060
1061 defp maybe_order(query, %{order: :asc}) do
1062 query
1063 |> order_by(asc: :id)
1064 end
1065
1066 defp maybe_order(query, _), do: query
1067
1068 defp fetch_activities_query_ap_ids_ops(opts) do
1069 source_user = opts[:muting_user]
1070 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1071
1072 ap_id_relationships =
1073 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1074 [:block | ap_id_relationships]
1075 else
1076 ap_id_relationships
1077 end
1078
1079 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1080
1081 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1082 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1083
1084 restrict_muted_reblogs_opts =
1085 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1086
1087 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1088 end
1089
1090 def fetch_activities_query(recipients, opts \\ %{}) do
1091 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1092 fetch_activities_query_ap_ids_ops(opts)
1093
1094 config = %{
1095 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1096 }
1097
1098 Activity
1099 |> maybe_preload_objects(opts)
1100 |> maybe_preload_bookmarks(opts)
1101 |> maybe_preload_report_notes(opts)
1102 |> maybe_set_thread_muted_field(opts)
1103 |> maybe_order(opts)
1104 |> restrict_recipients(recipients, opts[:user])
1105 |> restrict_replies(opts)
1106 |> restrict_tag(opts)
1107 |> restrict_tag_reject(opts)
1108 |> restrict_tag_all(opts)
1109 |> restrict_since(opts)
1110 |> restrict_local(opts)
1111 |> restrict_actor(opts)
1112 |> restrict_type(opts)
1113 |> restrict_state(opts)
1114 |> restrict_favorited_by(opts)
1115 |> restrict_blocked(restrict_blocked_opts)
1116 |> restrict_muted(restrict_muted_opts)
1117 |> restrict_filtered(opts)
1118 |> restrict_media(opts)
1119 |> restrict_visibility(opts)
1120 |> restrict_thread_visibility(opts, config)
1121 |> restrict_reblogs(opts)
1122 |> restrict_pinned(opts)
1123 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1124 |> restrict_instance(opts)
1125 |> restrict_announce_object_actor(opts)
1126 |> restrict_filtered(opts)
1127 |> Activity.restrict_deactivated_users()
1128 |> exclude_poll_votes(opts)
1129 |> exclude_chat_messages(opts)
1130 |> exclude_invisible_actors(opts)
1131 |> exclude_visibility(opts)
1132 end
1133
1134 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1135 list_memberships = Pleroma.List.memberships(opts[:user])
1136
1137 fetch_activities_query(recipients ++ list_memberships, opts)
1138 |> Pagination.fetch_paginated(opts, pagination)
1139 |> Enum.reverse()
1140 |> maybe_update_cc(list_memberships, opts[:user])
1141 end
1142
1143 @doc """
1144 Fetch favorites activities of user with order by sort adds to favorites
1145 """
1146 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1147 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1148 user.ap_id
1149 |> Activity.Queries.by_actor()
1150 |> Activity.Queries.by_type("Like")
1151 |> Activity.with_joined_object()
1152 |> Object.with_joined_activity()
1153 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1154 |> order_by([like, _, _], desc_nulls_last: like.id)
1155 |> Pagination.fetch_paginated(
1156 Map.merge(params, %{skip_order: true}),
1157 pagination
1158 )
1159 end
1160
1161 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1162 Enum.map(activities, fn
1163 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1164 if Enum.any?(bcc, &(&1 in list_memberships)) do
1165 update_in(activity.data["cc"], &[user_ap_id | &1])
1166 else
1167 activity
1168 end
1169
1170 activity ->
1171 activity
1172 end)
1173 end
1174
1175 defp maybe_update_cc(activities, _, _), do: activities
1176
1177 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1178 from(activity in query,
1179 where:
1180 fragment("? && ?", activity.recipients, ^recipients) or
1181 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1182 ^Constants.as_public() in activity.recipients)
1183 )
1184 end
1185
1186 def fetch_activities_bounded(
1187 recipients,
1188 recipients_with_public,
1189 opts \\ %{},
1190 pagination \\ :keyset
1191 ) do
1192 fetch_activities_query([], opts)
1193 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1194 |> Pagination.fetch_paginated(opts, pagination)
1195 |> Enum.reverse()
1196 end
1197
1198 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1199 def upload(file, opts \\ []) do
1200 with {:ok, data} <- Upload.store(file, opts) do
1201 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1202
1203 Repo.insert(%Object{data: obj_data})
1204 end
1205 end
1206
1207 @spec get_actor_url(any()) :: binary() | nil
1208 defp get_actor_url(url) when is_binary(url), do: url
1209 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1210
1211 defp get_actor_url(url) when is_list(url) do
1212 url
1213 |> List.first()
1214 |> get_actor_url()
1215 end
1216
1217 defp get_actor_url(_url), do: nil
1218
1219 defp object_to_user_data(data) do
1220 avatar =
1221 data["icon"]["url"] &&
1222 %{
1223 "type" => "Image",
1224 "url" => [%{"href" => data["icon"]["url"]}]
1225 }
1226
1227 banner =
1228 data["image"]["url"] &&
1229 %{
1230 "type" => "Image",
1231 "url" => [%{"href" => data["image"]["url"]}]
1232 }
1233
1234 fields =
1235 data
1236 |> Map.get("attachment", [])
1237 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1238 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1239
1240 emojis =
1241 data
1242 |> Map.get("tag", [])
1243 |> Enum.filter(fn
1244 %{"type" => "Emoji"} -> true
1245 _ -> false
1246 end)
1247 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1248 {String.trim(name, ":"), url}
1249 end)
1250
1251 is_locked = data["manuallyApprovesFollowers"] || false
1252 capabilities = data["capabilities"] || %{}
1253 accepts_chat_messages = capabilities["acceptsChatMessages"]
1254 data = Transmogrifier.maybe_fix_user_object(data)
1255 is_discoverable = data["discoverable"] || false
1256 invisible = data["invisible"] || false
1257 actor_type = data["type"] || "Person"
1258
1259 public_key =
1260 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1261 data["publicKey"]["publicKeyPem"]
1262 else
1263 nil
1264 end
1265
1266 shared_inbox =
1267 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1268 data["endpoints"]["sharedInbox"]
1269 else
1270 nil
1271 end
1272
1273 user_data = %{
1274 ap_id: data["id"],
1275 uri: get_actor_url(data["url"]),
1276 ap_enabled: true,
1277 banner: banner,
1278 fields: fields,
1279 emoji: emojis,
1280 is_locked: is_locked,
1281 is_discoverable: is_discoverable,
1282 invisible: invisible,
1283 avatar: avatar,
1284 name: data["name"],
1285 follower_address: data["followers"],
1286 following_address: data["following"],
1287 bio: data["summary"] || "",
1288 actor_type: actor_type,
1289 also_known_as: Map.get(data, "alsoKnownAs", []),
1290 public_key: public_key,
1291 inbox: data["inbox"],
1292 shared_inbox: shared_inbox,
1293 accepts_chat_messages: accepts_chat_messages
1294 }
1295
1296 # nickname can be nil because of virtual actors
1297 if data["preferredUsername"] do
1298 Map.put(
1299 user_data,
1300 :nickname,
1301 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1302 )
1303 else
1304 Map.put(user_data, :nickname, nil)
1305 end
1306 end
1307
1308 def fetch_follow_information_for_user(user) do
1309 with {:ok, following_data} <-
1310 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1311 {:ok, hide_follows} <- collection_private(following_data),
1312 {:ok, followers_data} <-
1313 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1314 {:ok, hide_followers} <- collection_private(followers_data) do
1315 {:ok,
1316 %{
1317 hide_follows: hide_follows,
1318 follower_count: normalize_counter(followers_data["totalItems"]),
1319 following_count: normalize_counter(following_data["totalItems"]),
1320 hide_followers: hide_followers
1321 }}
1322 else
1323 {:error, _} = e -> e
1324 e -> {:error, e}
1325 end
1326 end
1327
1328 defp normalize_counter(counter) when is_integer(counter), do: counter
1329 defp normalize_counter(_), do: 0
1330
1331 def maybe_update_follow_information(user_data) do
1332 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1333 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1334 {_, true} <-
1335 {:collections_available,
1336 !!(user_data[:following_address] && user_data[:follower_address])},
1337 {:ok, info} <-
1338 fetch_follow_information_for_user(user_data) do
1339 info = Map.merge(user_data[:info] || %{}, info)
1340
1341 user_data
1342 |> Map.put(:info, info)
1343 else
1344 {:user_type_check, false} ->
1345 user_data
1346
1347 {:collections_available, false} ->
1348 user_data
1349
1350 {:enabled, false} ->
1351 user_data
1352
1353 e ->
1354 Logger.error(
1355 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1356 )
1357
1358 user_data
1359 end
1360 end
1361
1362 defp collection_private(%{"first" => %{"type" => type}})
1363 when type in ["CollectionPage", "OrderedCollectionPage"],
1364 do: {:ok, false}
1365
1366 defp collection_private(%{"first" => first}) do
1367 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1368 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1369 {:ok, false}
1370 else
1371 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1372 {:error, _} = e -> e
1373 e -> {:error, e}
1374 end
1375 end
1376
1377 defp collection_private(_data), do: {:ok, true}
1378
1379 def user_data_from_user_object(data) do
1380 with {:ok, data} <- MRF.filter(data) do
1381 {:ok, object_to_user_data(data)}
1382 else
1383 e -> {:error, e}
1384 end
1385 end
1386
1387 def fetch_and_prepare_user_from_ap_id(ap_id) do
1388 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1389 {:ok, data} <- user_data_from_user_object(data) do
1390 {:ok, maybe_update_follow_information(data)}
1391 else
1392 # If this has been deleted, only log a debug and not an error
1393 {:error, "Object has been deleted" = e} ->
1394 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1395 {:error, e}
1396
1397 {:error, {:reject, reason} = e} ->
1398 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1399 {:error, e}
1400
1401 {:error, e} ->
1402 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1403 {:error, e}
1404 end
1405 end
1406
1407 def maybe_handle_clashing_nickname(data) do
1408 with nickname when is_binary(nickname) <- data[:nickname],
1409 %User{} = old_user <- User.get_by_nickname(nickname),
1410 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1411 Logger.info(
1412 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1413 data[:ap_id]
1414 }, renaming."
1415 )
1416
1417 old_user
1418 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1419 |> User.update_and_set_cache()
1420 else
1421 {:ap_id_comparison, true} ->
1422 Logger.info(
1423 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1424 )
1425
1426 _ ->
1427 nil
1428 end
1429 end
1430
1431 def make_user_from_ap_id(ap_id) do
1432 user = User.get_cached_by_ap_id(ap_id)
1433
1434 if user && !User.ap_enabled?(user) do
1435 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1436 else
1437 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1438 if user do
1439 user
1440 |> User.remote_user_changeset(data)
1441 |> User.update_and_set_cache()
1442 else
1443 maybe_handle_clashing_nickname(data)
1444
1445 data
1446 |> User.remote_user_changeset()
1447 |> Repo.insert()
1448 |> User.set_cache()
1449 end
1450 end
1451 end
1452 end
1453
1454 def make_user_from_nickname(nickname) do
1455 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1456 make_user_from_ap_id(ap_id)
1457 else
1458 _e -> {:error, "No AP id in WebFinger"}
1459 end
1460 end
1461
1462 # filter out broken threads
1463 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1464 entire_thread_visible_for_user?(activity, user)
1465 end
1466
1467 # do post-processing on a specific activity
1468 def contain_activity(%Activity{} = activity, %User{} = user) do
1469 contain_broken_threads(activity, user)
1470 end
1471
1472 def fetch_direct_messages_query do
1473 Activity
1474 |> restrict_type(%{type: "Create"})
1475 |> restrict_visibility(%{visibility: "direct"})
1476 |> order_by([activity], asc: activity.id)
1477 end
1478 end