Merge branch 'docs_use_include_for_install_further_reading' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
89 "type" => "Create",
90 "actor" => actor
91 }) do
92 Object.increase_vote_count(reply_ap_id, name, actor)
93 end
94
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
96
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
101 {:ok, object, meta}
102 end
103 end
104
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
108 {:ok, activity} <-
109 Repo.insert(%Activity{
110 data: object,
111 local: local,
112 recipients: recipients,
113 actor: object["actor"]
114 }) do
115 {:ok, activity, meta}
116 end
117 end
118
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 %Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 }
137 |> Repo.insert()
138 |> maybe_create_activity_expiration()
139
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
142
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
144
145 {:ok, activity}
146 else
147 %Activity{} = activity ->
148 {:ok, activity}
149
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
152 data: map,
153 local: local,
154 actor: map["actor"],
155 recipients: recipients,
156 id: "pleroma:fakeid"
157 }
158
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:ok, activity}
161
162 error ->
163 {:error, error}
164 end
165 end
166
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
169
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
172 stream_out(activity)
173 stream_out_participations(participations)
174 end
175
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
178 {:ok, activity}
179 end
180 end
181
182 defp maybe_create_activity_expiration(result), do: result
183
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
188 {:ok, conversation}
189 end
190 end
191
192 defp get_participations({:ok, conversation}) do
193 conversation
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
196 end
197
198 defp get_participations(_), do: []
199
200 def stream_out_participations(participations) do
201 participations =
202 participations
203 |> Repo.preload(:user)
204
205 Streamer.stream("participation", participations)
206 end
207
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
211
212 last_activity_id =
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
214 user: user,
215 blocking_user: user
216 })
217
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
220 end
221 end
222 end
223
224 def stream_out_participations(_, _), do: :noop
225
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
228 activity
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
231 end
232
233 def stream_out(_activity) do
234 :noop
235 end
236
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
240 result
241 end
242 end
243
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
250
251 create_data =
252 make_create_data(
253 %{to: to, actor: actor, published: published, context: context, object: object},
254 additional
255 )
256
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
265 {:ok, activity}
266 else
267 {:quick_insert, true, activity} ->
268 {:ok, activity}
269
270 {:fake, true, activity} ->
271 {:ok, activity}
272
273 {:error, message} ->
274 Repo.rollback(message)
275 end
276 end
277
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
284
285 listen_data =
286 make_listen_data(
287 %{to: to, actor: actor, published: published, context: context, object: object},
288 additional
289 )
290
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
301 end
302
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
306 end
307
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
312
313 data =
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
316
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
320 {:ok, activity}
321 end
322 end
323
324 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
325 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
326 local = !(params[:local] == false)
327 activity_id = params[:activity_id]
328
329 data =
330 %{
331 "to" => to,
332 "cc" => cc,
333 "type" => "Update",
334 "actor" => actor,
335 "object" => object
336 }
337 |> Maps.put_if_present("id", activity_id)
338
339 with {:ok, activity} <- insert(data, local),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(activity) do
342 {:ok, activity}
343 end
344 end
345
346 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
347 {:ok, Activity.t()} | {:error, any()}
348 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
351 result
352 end
353 end
354
355 defp do_follow(follower, followed, activity_id, local, opts) do
356 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
357 data = make_follow_data(follower, followed, activity_id)
358
359 with {:ok, activity} <- insert(data, local),
360 _ <- skip_notify_and_stream || notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 else
364 {:error, error} -> Repo.rollback(error)
365 end
366 end
367
368 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
369 {:ok, Activity.t()} | nil | {:error, any()}
370 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
371 with {:ok, result} <-
372 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
373 result
374 end
375 end
376
377 defp do_unfollow(follower, followed, activity_id, local) do
378 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
379 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
380 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
381 {:ok, activity} <- insert(unfollow_data, local),
382 _ <- notify_and_stream(activity),
383 :ok <- maybe_federate(activity) do
384 {:ok, activity}
385 else
386 nil -> nil
387 {:error, error} -> Repo.rollback(error)
388 end
389 end
390
391 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
392 {:ok, Activity.t()} | {:error, any()}
393 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
394 with {:ok, result} <-
395 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
396 result
397 end
398 end
399
400 defp do_block(blocker, blocked, activity_id, local) do
401 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
402
403 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
404 unfollow(blocker, blocked, nil, local)
405 end
406
407 block_data = make_block_data(blocker, blocked, activity_id)
408
409 with {:ok, activity} <- insert(block_data, local),
410 _ <- notify_and_stream(activity),
411 :ok <- maybe_federate(activity) do
412 {:ok, activity}
413 else
414 {:error, error} -> Repo.rollback(error)
415 end
416 end
417
418 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
419 def flag(
420 %{
421 actor: actor,
422 context: _context,
423 account: account,
424 statuses: statuses,
425 content: content
426 } = params
427 ) do
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
431
432 additional = params[:additional] || %{}
433
434 additional =
435 if forward do
436 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
437 else
438 Map.merge(additional, %{"to" => [], "cc" => []})
439 end
440
441 with flag_data <- make_flag_data(params, additional),
442 {:ok, activity} <- insert(flag_data, local),
443 {:ok, stripped_activity} <- strip_report_status_data(activity),
444 _ <- notify_and_stream(activity),
445 :ok <- maybe_federate(stripped_activity) do
446 User.all_superusers()
447 |> Enum.filter(fn user -> not is_nil(user.email) end)
448 |> Enum.each(fn superuser ->
449 superuser
450 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
451 |> Pleroma.Emails.Mailer.deliver_async()
452 end)
453
454 {:ok, activity}
455 end
456 end
457
458 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
459 def move(%User{} = origin, %User{} = target, local \\ true) do
460 params = %{
461 "type" => "Move",
462 "actor" => origin.ap_id,
463 "object" => origin.ap_id,
464 "target" => target.ap_id
465 }
466
467 with true <- origin.ap_id in target.also_known_as,
468 {:ok, activity} <- insert(params, local),
469 _ <- notify_and_stream(activity) do
470 maybe_federate(activity)
471
472 BackgroundWorker.enqueue("move_following", %{
473 "origin_id" => origin.id,
474 "target_id" => target.id
475 })
476
477 {:ok, activity}
478 else
479 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
480 err -> err
481 end
482 end
483
484 def fetch_activities_for_context_query(context, opts) do
485 public = [Constants.as_public()]
486
487 recipients =
488 if opts[:user],
489 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
490 else: public
491
492 from(activity in Activity)
493 |> maybe_preload_objects(opts)
494 |> maybe_preload_bookmarks(opts)
495 |> maybe_set_thread_muted_field(opts)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts[:user])
498 |> where(
499 [activity],
500 fragment(
501 "?->>'type' = ? and ?->>'context' = ?",
502 activity.data,
503 "Create",
504 activity.data,
505 ^context
506 )
507 )
508 |> exclude_poll_votes(opts)
509 |> exclude_id(opts)
510 |> order_by([activity], desc: activity.id)
511 end
512
513 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
514 def fetch_activities_for_context(context, opts \\ %{}) do
515 context
516 |> fetch_activities_for_context_query(opts)
517 |> Repo.all()
518 end
519
520 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
521 FlakeId.Ecto.CompatType.t() | nil
522 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
523 context
524 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
525 |> restrict_visibility(%{visibility: "direct"})
526 |> limit(1)
527 |> select([a], a.id)
528 |> Repo.one()
529 end
530
531 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
532 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
533 opts = Map.delete(opts, :user)
534
535 [Constants.as_public()]
536 |> fetch_activities_query(opts)
537 |> restrict_unlisted(opts)
538 |> Pagination.fetch_paginated(opts, pagination)
539 end
540
541 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
543 opts
544 |> Map.put(:restrict_unlisted, true)
545 |> fetch_public_or_unlisted_activities(pagination)
546 end
547
548 @valid_visibilities ~w[direct unlisted public private]
549
550 defp restrict_visibility(query, %{visibility: visibility})
551 when is_list(visibility) do
552 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
553 from(
554 a in query,
555 where:
556 fragment(
557 "activity_visibility(?, ?, ?) = ANY (?)",
558 a.actor,
559 a.recipients,
560 a.data,
561 ^visibility
562 )
563 )
564 else
565 Logger.error("Could not restrict visibility to #{visibility}")
566 end
567 end
568
569 defp restrict_visibility(query, %{visibility: visibility})
570 when visibility in @valid_visibilities do
571 from(
572 a in query,
573 where:
574 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
575 )
576 end
577
578 defp restrict_visibility(_query, %{visibility: visibility})
579 when visibility not in @valid_visibilities do
580 Logger.error("Could not restrict visibility to #{visibility}")
581 end
582
583 defp restrict_visibility(query, _visibility), do: query
584
585 defp exclude_visibility(query, %{exclude_visibilities: visibility})
586 when is_list(visibility) do
587 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
588 from(
589 a in query,
590 where:
591 not fragment(
592 "activity_visibility(?, ?, ?) = ANY (?)",
593 a.actor,
594 a.recipients,
595 a.data,
596 ^visibility
597 )
598 )
599 else
600 Logger.error("Could not exclude visibility to #{visibility}")
601 query
602 end
603 end
604
605 defp exclude_visibility(query, %{exclude_visibilities: visibility})
606 when visibility in @valid_visibilities do
607 from(
608 a in query,
609 where:
610 not fragment(
611 "activity_visibility(?, ?, ?) = ?",
612 a.actor,
613 a.recipients,
614 a.data,
615 ^visibility
616 )
617 )
618 end
619
620 defp exclude_visibility(query, %{exclude_visibilities: visibility})
621 when visibility not in [nil | @valid_visibilities] do
622 Logger.error("Could not exclude visibility to #{visibility}")
623 query
624 end
625
626 defp exclude_visibility(query, _visibility), do: query
627
628 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
629 do: query
630
631 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
632 do: query
633
634 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
635 from(
636 a in query,
637 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
638 )
639 end
640
641 defp restrict_thread_visibility(query, _, _), do: query
642
643 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
644 params =
645 params
646 |> Map.put(:user, reading_user)
647 |> Map.put(:actor_id, user.ap_id)
648
649 %{
650 godmode: params[:godmode],
651 reading_user: reading_user
652 }
653 |> user_activities_recipients()
654 |> fetch_activities(params)
655 |> Enum.reverse()
656 end
657
658 def fetch_user_activities(user, reading_user, params \\ %{}) do
659 params =
660 params
661 |> Map.put(:type, ["Create", "Announce"])
662 |> Map.put(:user, reading_user)
663 |> Map.put(:actor_id, user.ap_id)
664 |> Map.put(:pinned_activity_ids, user.pinned_activities)
665
666 params =
667 if User.blocks?(reading_user, user) do
668 params
669 else
670 params
671 |> Map.put(:blocking_user, reading_user)
672 |> Map.put(:muting_user, reading_user)
673 end
674
675 %{
676 godmode: params[:godmode],
677 reading_user: reading_user
678 }
679 |> user_activities_recipients()
680 |> fetch_activities(params)
681 |> Enum.reverse()
682 end
683
684 def fetch_statuses(reading_user, params) do
685 params = Map.put(params, :type, ["Create", "Announce"])
686
687 %{
688 godmode: params[:godmode],
689 reading_user: reading_user
690 }
691 |> user_activities_recipients()
692 |> fetch_activities(params, :offset)
693 |> Enum.reverse()
694 end
695
696 defp user_activities_recipients(%{godmode: true}), do: []
697
698 defp user_activities_recipients(%{reading_user: reading_user}) do
699 if reading_user do
700 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
701 else
702 [Constants.as_public()]
703 end
704 end
705
706 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
707 raise "Can't use the child object without preloading!"
708 end
709
710 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
711 from(
712 [activity, object] in query,
713 where:
714 fragment(
715 "?->>'type' != ? or ?->>'actor' != ?",
716 activity.data,
717 "Announce",
718 object.data,
719 ^actor
720 )
721 )
722 end
723
724 defp restrict_announce_object_actor(query, _), do: query
725
726 defp restrict_since(query, %{since_id: ""}), do: query
727
728 defp restrict_since(query, %{since_id: since_id}) do
729 from(activity in query, where: activity.id > ^since_id)
730 end
731
732 defp restrict_since(query, _), do: query
733
734 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
735 raise "Can't use the child object without preloading!"
736 end
737
738 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
739 from(
740 [_activity, object] in query,
741 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
742 )
743 end
744
745 defp restrict_tag_reject(query, _), do: query
746
747 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
748 raise "Can't use the child object without preloading!"
749 end
750
751 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
752 from(
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
755 )
756 end
757
758 defp restrict_tag_all(query, _), do: query
759
760 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
761 raise "Can't use the child object without preloading!"
762 end
763
764 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
765 from(
766 [_activity, object] in query,
767 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
768 )
769 end
770
771 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
772 from(
773 [_activity, object] in query,
774 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
775 )
776 end
777
778 defp restrict_tag(query, _), do: query
779
780 defp restrict_recipients(query, [], _user), do: query
781
782 defp restrict_recipients(query, recipients, nil) do
783 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
784 end
785
786 defp restrict_recipients(query, recipients, user) do
787 from(
788 activity in query,
789 where: fragment("? && ?", ^recipients, activity.recipients),
790 or_where: activity.actor == ^user.ap_id
791 )
792 end
793
794 defp restrict_local(query, %{local_only: true}) do
795 from(activity in query, where: activity.local == true)
796 end
797
798 defp restrict_local(query, _), do: query
799
800 defp restrict_actor(query, %{actor_id: actor_id}) do
801 from(activity in query, where: activity.actor == ^actor_id)
802 end
803
804 defp restrict_actor(query, _), do: query
805
806 defp restrict_type(query, %{type: type}) when is_binary(type) do
807 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
808 end
809
810 defp restrict_type(query, %{type: type}) do
811 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
812 end
813
814 defp restrict_type(query, _), do: query
815
816 defp restrict_state(query, %{state: state}) do
817 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
818 end
819
820 defp restrict_state(query, _), do: query
821
822 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
823 from(
824 [_activity, object] in query,
825 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
826 )
827 end
828
829 defp restrict_favorited_by(query, _), do: query
830
831 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
832 raise "Can't use the child object without preloading!"
833 end
834
835 defp restrict_media(query, %{only_media: true}) do
836 from(
837 [activity, object] in query,
838 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
839 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
840 )
841 end
842
843 defp restrict_media(query, _), do: query
844
845 defp restrict_replies(query, %{exclude_replies: true}) do
846 from(
847 [_activity, object] in query,
848 where: fragment("?->>'inReplyTo' is null", object.data)
849 )
850 end
851
852 defp restrict_replies(query, %{
853 reply_filtering_user: user,
854 reply_visibility: "self"
855 }) do
856 from(
857 [activity, object] in query,
858 where:
859 fragment(
860 "?->>'inReplyTo' is null OR ? = ANY(?)",
861 object.data,
862 ^user.ap_id,
863 activity.recipients
864 )
865 )
866 end
867
868 defp restrict_replies(query, %{
869 reply_filtering_user: user,
870 reply_visibility: "following"
871 }) do
872 from(
873 [activity, object] in query,
874 where:
875 fragment(
876 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
877 object.data,
878 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
879 activity.recipients,
880 activity.actor,
881 activity.actor,
882 ^user.ap_id
883 )
884 )
885 end
886
887 defp restrict_replies(query, _), do: query
888
889 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
890 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
891 end
892
893 defp restrict_reblogs(query, _), do: query
894
895 defp restrict_muted(query, %{with_muted: true}), do: query
896
897 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
898 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
899
900 query =
901 from([activity] in query,
902 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
903 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
904 )
905
906 unless opts[:skip_preload] do
907 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
908 else
909 query
910 end
911 end
912
913 defp restrict_muted(query, _), do: query
914
915 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
916 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
917 domain_blocks = user.domain_blocks || []
918
919 following_ap_ids = User.get_friends_ap_ids(user)
920
921 query =
922 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
923
924 from(
925 [activity, object: o] in query,
926 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
927 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
928 where:
929 fragment(
930 "recipients_contain_blocked_domains(?, ?) = false",
931 activity.recipients,
932 ^domain_blocks
933 ),
934 where:
935 fragment(
936 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
937 activity.data,
938 activity.data,
939 ^blocked_ap_ids
940 ),
941 where:
942 fragment(
943 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
944 activity.actor,
945 ^domain_blocks,
946 activity.actor,
947 ^following_ap_ids
948 ),
949 where:
950 fragment(
951 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
952 o.data,
953 ^domain_blocks,
954 o.data,
955 ^following_ap_ids
956 )
957 )
958 end
959
960 defp restrict_blocked(query, _), do: query
961
962 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
963 from(
964 activity in query,
965 where:
966 fragment(
967 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
968 activity.data,
969 ^[Constants.as_public()]
970 )
971 )
972 end
973
974 defp restrict_unlisted(query, _), do: query
975
976 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
977 from(activity in query, where: activity.id in ^ids)
978 end
979
980 defp restrict_pinned(query, _), do: query
981
982 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
983 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
984
985 from(
986 activity in query,
987 where:
988 fragment(
989 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
990 activity.data,
991 activity.actor,
992 ^muted_reblogs
993 )
994 )
995 end
996
997 defp restrict_muted_reblogs(query, _), do: query
998
999 defp restrict_instance(query, %{instance: instance}) do
1000 users =
1001 from(
1002 u in User,
1003 select: u.ap_id,
1004 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1005 )
1006 |> Repo.all()
1007
1008 from(activity in query, where: activity.actor in ^users)
1009 end
1010
1011 defp restrict_instance(query, _), do: query
1012
1013 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1014
1015 defp exclude_poll_votes(query, _) do
1016 if has_named_binding?(query, :object) do
1017 from([activity, object: o] in query,
1018 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1019 )
1020 else
1021 query
1022 end
1023 end
1024
1025 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1026
1027 defp exclude_chat_messages(query, _) do
1028 if has_named_binding?(query, :object) do
1029 from([activity, object: o] in query,
1030 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1031 )
1032 else
1033 query
1034 end
1035 end
1036
1037 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1038
1039 defp exclude_invisible_actors(query, _opts) do
1040 invisible_ap_ids =
1041 User.Query.build(%{invisible: true, select: [:ap_id]})
1042 |> Repo.all()
1043 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1044
1045 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1046 end
1047
1048 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1049 from(activity in query, where: activity.id != ^id)
1050 end
1051
1052 defp exclude_id(query, _), do: query
1053
1054 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1055
1056 defp maybe_preload_objects(query, _) do
1057 query
1058 |> Activity.with_preloaded_object()
1059 end
1060
1061 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1062
1063 defp maybe_preload_bookmarks(query, opts) do
1064 query
1065 |> Activity.with_preloaded_bookmark(opts[:user])
1066 end
1067
1068 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1069 query
1070 |> Activity.with_preloaded_report_notes()
1071 end
1072
1073 defp maybe_preload_report_notes(query, _), do: query
1074
1075 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1076
1077 defp maybe_set_thread_muted_field(query, opts) do
1078 query
1079 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1080 end
1081
1082 defp maybe_order(query, %{order: :desc}) do
1083 query
1084 |> order_by(desc: :id)
1085 end
1086
1087 defp maybe_order(query, %{order: :asc}) do
1088 query
1089 |> order_by(asc: :id)
1090 end
1091
1092 defp maybe_order(query, _), do: query
1093
1094 defp fetch_activities_query_ap_ids_ops(opts) do
1095 source_user = opts[:muting_user]
1096 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1097
1098 ap_id_relationships =
1099 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1100 [:block | ap_id_relationships]
1101 else
1102 ap_id_relationships
1103 end
1104
1105 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1106
1107 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1108 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1109
1110 restrict_muted_reblogs_opts =
1111 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1112
1113 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1114 end
1115
1116 def fetch_activities_query(recipients, opts \\ %{}) do
1117 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1118 fetch_activities_query_ap_ids_ops(opts)
1119
1120 config = %{
1121 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1122 }
1123
1124 Activity
1125 |> maybe_preload_objects(opts)
1126 |> maybe_preload_bookmarks(opts)
1127 |> maybe_preload_report_notes(opts)
1128 |> maybe_set_thread_muted_field(opts)
1129 |> maybe_order(opts)
1130 |> restrict_recipients(recipients, opts[:user])
1131 |> restrict_replies(opts)
1132 |> restrict_tag(opts)
1133 |> restrict_tag_reject(opts)
1134 |> restrict_tag_all(opts)
1135 |> restrict_since(opts)
1136 |> restrict_local(opts)
1137 |> restrict_actor(opts)
1138 |> restrict_type(opts)
1139 |> restrict_state(opts)
1140 |> restrict_favorited_by(opts)
1141 |> restrict_blocked(restrict_blocked_opts)
1142 |> restrict_muted(restrict_muted_opts)
1143 |> restrict_media(opts)
1144 |> restrict_visibility(opts)
1145 |> restrict_thread_visibility(opts, config)
1146 |> restrict_reblogs(opts)
1147 |> restrict_pinned(opts)
1148 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1149 |> restrict_instance(opts)
1150 |> restrict_announce_object_actor(opts)
1151 |> Activity.restrict_deactivated_users()
1152 |> exclude_poll_votes(opts)
1153 |> exclude_chat_messages(opts)
1154 |> exclude_invisible_actors(opts)
1155 |> exclude_visibility(opts)
1156 end
1157
1158 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1159 list_memberships = Pleroma.List.memberships(opts[:user])
1160
1161 fetch_activities_query(recipients ++ list_memberships, opts)
1162 |> Pagination.fetch_paginated(opts, pagination)
1163 |> Enum.reverse()
1164 |> maybe_update_cc(list_memberships, opts[:user])
1165 end
1166
1167 @doc """
1168 Fetch favorites activities of user with order by sort adds to favorites
1169 """
1170 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1171 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1172 user.ap_id
1173 |> Activity.Queries.by_actor()
1174 |> Activity.Queries.by_type("Like")
1175 |> Activity.with_joined_object()
1176 |> Object.with_joined_activity()
1177 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1178 |> order_by([like, _, _], desc_nulls_last: like.id)
1179 |> Pagination.fetch_paginated(
1180 Map.merge(params, %{skip_order: true}),
1181 pagination
1182 )
1183 end
1184
1185 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1186 Enum.map(activities, fn
1187 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1188 if Enum.any?(bcc, &(&1 in list_memberships)) do
1189 update_in(activity.data["cc"], &[user_ap_id | &1])
1190 else
1191 activity
1192 end
1193
1194 activity ->
1195 activity
1196 end)
1197 end
1198
1199 defp maybe_update_cc(activities, _, _), do: activities
1200
1201 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1202 from(activity in query,
1203 where:
1204 fragment("? && ?", activity.recipients, ^recipients) or
1205 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1206 ^Constants.as_public() in activity.recipients)
1207 )
1208 end
1209
1210 def fetch_activities_bounded(
1211 recipients,
1212 recipients_with_public,
1213 opts \\ %{},
1214 pagination \\ :keyset
1215 ) do
1216 fetch_activities_query([], opts)
1217 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1218 |> Pagination.fetch_paginated(opts, pagination)
1219 |> Enum.reverse()
1220 end
1221
1222 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1223 def upload(file, opts \\ []) do
1224 with {:ok, data} <- Upload.store(file, opts) do
1225 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1226
1227 Repo.insert(%Object{data: obj_data})
1228 end
1229 end
1230
1231 @spec get_actor_url(any()) :: binary() | nil
1232 defp get_actor_url(url) when is_binary(url), do: url
1233 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1234
1235 defp get_actor_url(url) when is_list(url) do
1236 url
1237 |> List.first()
1238 |> get_actor_url()
1239 end
1240
1241 defp get_actor_url(_url), do: nil
1242
1243 defp object_to_user_data(data) do
1244 avatar =
1245 data["icon"]["url"] &&
1246 %{
1247 "type" => "Image",
1248 "url" => [%{"href" => data["icon"]["url"]}]
1249 }
1250
1251 banner =
1252 data["image"]["url"] &&
1253 %{
1254 "type" => "Image",
1255 "url" => [%{"href" => data["image"]["url"]}]
1256 }
1257
1258 fields =
1259 data
1260 |> Map.get("attachment", [])
1261 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1262 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1263
1264 emojis =
1265 data
1266 |> Map.get("tag", [])
1267 |> Enum.filter(fn
1268 %{"type" => "Emoji"} -> true
1269 _ -> false
1270 end)
1271 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1272 {String.trim(name, ":"), url}
1273 end)
1274
1275 locked = data["manuallyApprovesFollowers"] || false
1276 data = Transmogrifier.maybe_fix_user_object(data)
1277 discoverable = data["discoverable"] || false
1278 invisible = data["invisible"] || false
1279 actor_type = data["type"] || "Person"
1280
1281 public_key =
1282 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1283 data["publicKey"]["publicKeyPem"]
1284 else
1285 nil
1286 end
1287
1288 shared_inbox =
1289 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1290 data["endpoints"]["sharedInbox"]
1291 else
1292 nil
1293 end
1294
1295 user_data = %{
1296 ap_id: data["id"],
1297 uri: get_actor_url(data["url"]),
1298 ap_enabled: true,
1299 banner: banner,
1300 fields: fields,
1301 emoji: emojis,
1302 locked: locked,
1303 discoverable: discoverable,
1304 invisible: invisible,
1305 avatar: avatar,
1306 name: data["name"],
1307 follower_address: data["followers"],
1308 following_address: data["following"],
1309 bio: data["summary"],
1310 actor_type: actor_type,
1311 also_known_as: Map.get(data, "alsoKnownAs", []),
1312 public_key: public_key,
1313 inbox: data["inbox"],
1314 shared_inbox: shared_inbox
1315 }
1316
1317 # nickname can be nil because of virtual actors
1318 if data["preferredUsername"] do
1319 Map.put(
1320 user_data,
1321 :nickname,
1322 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1323 )
1324 else
1325 Map.put(user_data, :nickname, nil)
1326 end
1327 end
1328
1329 def fetch_follow_information_for_user(user) do
1330 with {:ok, following_data} <-
1331 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1332 {:ok, hide_follows} <- collection_private(following_data),
1333 {:ok, followers_data} <-
1334 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1335 {:ok, hide_followers} <- collection_private(followers_data) do
1336 {:ok,
1337 %{
1338 hide_follows: hide_follows,
1339 follower_count: normalize_counter(followers_data["totalItems"]),
1340 following_count: normalize_counter(following_data["totalItems"]),
1341 hide_followers: hide_followers
1342 }}
1343 else
1344 {:error, _} = e -> e
1345 e -> {:error, e}
1346 end
1347 end
1348
1349 defp normalize_counter(counter) when is_integer(counter), do: counter
1350 defp normalize_counter(_), do: 0
1351
1352 def maybe_update_follow_information(user_data) do
1353 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1354 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1355 {_, true} <-
1356 {:collections_available,
1357 !!(user_data[:following_address] && user_data[:follower_address])},
1358 {:ok, info} <-
1359 fetch_follow_information_for_user(user_data) do
1360 info = Map.merge(user_data[:info] || %{}, info)
1361
1362 user_data
1363 |> Map.put(:info, info)
1364 else
1365 {:user_type_check, false} ->
1366 user_data
1367
1368 {:collections_available, false} ->
1369 user_data
1370
1371 {:enabled, false} ->
1372 user_data
1373
1374 e ->
1375 Logger.error(
1376 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1377 )
1378
1379 user_data
1380 end
1381 end
1382
1383 defp collection_private(%{"first" => %{"type" => type}})
1384 when type in ["CollectionPage", "OrderedCollectionPage"],
1385 do: {:ok, false}
1386
1387 defp collection_private(%{"first" => first}) do
1388 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1389 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1390 {:ok, false}
1391 else
1392 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1393 {:error, _} = e -> e
1394 e -> {:error, e}
1395 end
1396 end
1397
1398 defp collection_private(_data), do: {:ok, true}
1399
1400 def user_data_from_user_object(data) do
1401 with {:ok, data} <- MRF.filter(data) do
1402 {:ok, object_to_user_data(data)}
1403 else
1404 e -> {:error, e}
1405 end
1406 end
1407
1408 def fetch_and_prepare_user_from_ap_id(ap_id) do
1409 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1410 {:ok, data} <- user_data_from_user_object(data) do
1411 {:ok, maybe_update_follow_information(data)}
1412 else
1413 {:error, "Object has been deleted" = e} ->
1414 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1415 {:error, e}
1416
1417 {:error, e} ->
1418 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1419 {:error, e}
1420 end
1421 end
1422
1423 def make_user_from_ap_id(ap_id) do
1424 user = User.get_cached_by_ap_id(ap_id)
1425
1426 if user && !User.ap_enabled?(user) do
1427 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1428 else
1429 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1430 if user do
1431 user
1432 |> User.remote_user_changeset(data)
1433 |> User.update_and_set_cache()
1434 else
1435 data
1436 |> User.remote_user_changeset()
1437 |> Repo.insert()
1438 |> User.set_cache()
1439 end
1440 end
1441 end
1442 end
1443
1444 def make_user_from_nickname(nickname) do
1445 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1446 make_user_from_ap_id(ap_id)
1447 else
1448 _e -> {:error, "No AP id in WebFinger"}
1449 end
1450 end
1451
1452 # filter out broken threads
1453 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1454 entire_thread_visible_for_user?(activity, user)
1455 end
1456
1457 # do post-processing on a specific activity
1458 def contain_activity(%Activity{} = activity, %User{} = user) do
1459 contain_broken_threads(activity, user)
1460 end
1461
1462 def fetch_direct_messages_query do
1463 Activity
1464 |> restrict_type(%{type: "Create"})
1465 |> restrict_visibility(%{visibility: "direct"})
1466 |> order_by([activity], asc: activity.id)
1467 end
1468 end