Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into update-validator
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 defp increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 defp increase_poll_votes_if_vote(%{
88 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
89 "type" => "Create",
90 "actor" => actor
91 }) do
92 Object.increase_vote_count(reply_ap_id, name, actor)
93 end
94
95 defp increase_poll_votes_if_vote(_create_data), do: :noop
96
97 @object_types ["ChatMessage"]
98 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
99 def persist(%{"type" => type} = object, meta) when type in @object_types do
100 with {:ok, object} <- Object.create(object) do
101 {:ok, object, meta}
102 end
103 end
104
105 def persist(object, meta) do
106 with local <- Keyword.fetch!(meta, :local),
107 {recipients, _, _} <- get_recipients(object),
108 {:ok, activity} <-
109 Repo.insert(%Activity{
110 data: object,
111 local: local,
112 recipients: recipients,
113 actor: object["actor"]
114 }) do
115 {:ok, activity, meta}
116 end
117 end
118
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
124 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map) do
130 {:ok, activity} =
131 %Activity{
132 data: map,
133 local: local,
134 actor: map["actor"],
135 recipients: recipients
136 }
137 |> Repo.insert()
138 |> maybe_create_activity_expiration()
139
140 # Splice in the child object if we have one.
141 activity = Maps.put_if_present(activity, :object, object)
142
143 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
144
145 {:ok, activity}
146 else
147 %Activity{} = activity ->
148 {:ok, activity}
149
150 {:fake, true, map, recipients} ->
151 activity = %Activity{
152 data: map,
153 local: local,
154 actor: map["actor"],
155 recipients: recipients,
156 id: "pleroma:fakeid"
157 }
158
159 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
160 {:ok, activity}
161
162 error ->
163 {:error, error}
164 end
165 end
166
167 def notify_and_stream(activity) do
168 Notification.create_notifications(activity)
169
170 conversation = create_or_bump_conversation(activity, activity.actor)
171 participations = get_participations(conversation)
172 stream_out(activity)
173 stream_out_participations(participations)
174 end
175
176 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
177 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
178 {:ok, activity}
179 end
180 end
181
182 defp maybe_create_activity_expiration(result), do: result
183
184 defp create_or_bump_conversation(activity, actor) do
185 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
186 %User{} = user <- User.get_cached_by_ap_id(actor) do
187 Participation.mark_as_read(user, conversation)
188 {:ok, conversation}
189 end
190 end
191
192 defp get_participations({:ok, conversation}) do
193 conversation
194 |> Repo.preload(:participations, force: true)
195 |> Map.get(:participations)
196 end
197
198 defp get_participations(_), do: []
199
200 def stream_out_participations(participations) do
201 participations =
202 participations
203 |> Repo.preload(:user)
204
205 Streamer.stream("participation", participations)
206 end
207
208 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
209 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
210 conversation = Repo.preload(conversation, :participations)
211
212 last_activity_id =
213 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
214 user: user,
215 blocking_user: user
216 })
217
218 if last_activity_id do
219 stream_out_participations(conversation.participations)
220 end
221 end
222 end
223
224 def stream_out_participations(_, _), do: :noop
225
226 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
227 when data_type in ["Create", "Announce", "Delete"] do
228 activity
229 |> Topics.get_activity_topics()
230 |> Streamer.stream(activity)
231 end
232
233 def stream_out(_activity) do
234 :noop
235 end
236
237 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
238 def create(params, fake \\ false) do
239 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
240 result
241 end
242 end
243
244 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
245 additional = params[:additional] || %{}
246 # only accept false as false value
247 local = !(params[:local] == false)
248 published = params[:published]
249 quick_insert? = Config.get([:env]) == :benchmark
250
251 create_data =
252 make_create_data(
253 %{to: to, actor: actor, published: published, context: context, object: object},
254 additional
255 )
256
257 with {:ok, activity} <- insert(create_data, local, fake),
258 {:fake, false, activity} <- {:fake, fake, activity},
259 _ <- increase_replies_count_if_reply(create_data),
260 _ <- increase_poll_votes_if_vote(create_data),
261 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
262 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
263 _ <- notify_and_stream(activity),
264 :ok <- maybe_federate(activity) do
265 {:ok, activity}
266 else
267 {:quick_insert, true, activity} ->
268 {:ok, activity}
269
270 {:fake, true, activity} ->
271 {:ok, activity}
272
273 {:error, message} ->
274 Repo.rollback(message)
275 end
276 end
277
278 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
279 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
280 additional = params[:additional] || %{}
281 # only accept false as false value
282 local = !(params[:local] == false)
283 published = params[:published]
284
285 listen_data =
286 make_listen_data(
287 %{to: to, actor: actor, published: published, context: context, object: object},
288 additional
289 )
290
291 with {:ok, activity} <- insert(listen_data, local),
292 _ <- notify_and_stream(activity),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 end
296 end
297
298 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
299 def accept(params) do
300 accept_or_reject("Accept", params)
301 end
302
303 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
304 def reject(params) do
305 accept_or_reject("Reject", params)
306 end
307
308 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
309 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
310 local = Map.get(params, :local, true)
311 activity_id = Map.get(params, :activity_id, nil)
312
313 data =
314 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
315 |> Maps.put_if_present("id", activity_id)
316
317 with {:ok, activity} <- insert(data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
320 {:ok, activity}
321 end
322 end
323
324 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
325 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
326 local = !(params[:local] == false)
327 activity_id = params[:activity_id]
328
329 data =
330 %{
331 "to" => to,
332 "cc" => cc,
333 "type" => "Update",
334 "actor" => actor,
335 "object" => object
336 }
337 |> Maps.put_if_present("id", activity_id)
338
339 with {:ok, activity} <- insert(data, local),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(activity) do
342 {:ok, activity}
343 end
344 end
345
346 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
347 {:ok, Activity.t()} | {:error, any()}
348 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
351 result
352 end
353 end
354
355 defp do_follow(follower, followed, activity_id, local, opts) do
356 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
357 data = make_follow_data(follower, followed, activity_id)
358
359 with {:ok, activity} <- insert(data, local),
360 _ <- skip_notify_and_stream || notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 else
364 {:error, error} -> Repo.rollback(error)
365 end
366 end
367
368 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
369 {:ok, Activity.t()} | nil | {:error, any()}
370 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
371 with {:ok, result} <-
372 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
373 result
374 end
375 end
376
377 defp do_unfollow(follower, followed, activity_id, local) do
378 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
379 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
380 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
381 {:ok, activity} <- insert(unfollow_data, local),
382 _ <- notify_and_stream(activity),
383 :ok <- maybe_federate(activity) do
384 {:ok, activity}
385 else
386 nil -> nil
387 {:error, error} -> Repo.rollback(error)
388 end
389 end
390
391 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
392 {:ok, Activity.t()} | {:error, any()}
393 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
394 with {:ok, result} <-
395 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
396 result
397 end
398 end
399
400 defp do_block(blocker, blocked, activity_id, local) do
401 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
402
403 if unfollow_blocked and fetch_latest_follow(blocker, blocked) do
404 unfollow(blocker, blocked, nil, local)
405 end
406
407 block_data = make_block_data(blocker, blocked, activity_id)
408
409 with {:ok, activity} <- insert(block_data, local),
410 _ <- notify_and_stream(activity),
411 :ok <- maybe_federate(activity) do
412 {:ok, activity}
413 else
414 {:error, error} -> Repo.rollback(error)
415 end
416 end
417
418 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
419 def flag(
420 %{
421 actor: actor,
422 context: _context,
423 account: account,
424 statuses: statuses,
425 content: content
426 } = params
427 ) do
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
431
432 additional = params[:additional] || %{}
433
434 additional =
435 if forward do
436 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
437 else
438 Map.merge(additional, %{"to" => [], "cc" => []})
439 end
440
441 with flag_data <- make_flag_data(params, additional),
442 {:ok, activity} <- insert(flag_data, local),
443 {:ok, stripped_activity} <- strip_report_status_data(activity),
444 _ <- notify_and_stream(activity),
445 :ok <- maybe_federate(stripped_activity) do
446 User.all_superusers()
447 |> Enum.filter(fn user -> not is_nil(user.email) end)
448 |> Enum.each(fn superuser ->
449 superuser
450 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
451 |> Pleroma.Emails.Mailer.deliver_async()
452 end)
453
454 {:ok, activity}
455 end
456 end
457
458 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
459 def move(%User{} = origin, %User{} = target, local \\ true) do
460 params = %{
461 "type" => "Move",
462 "actor" => origin.ap_id,
463 "object" => origin.ap_id,
464 "target" => target.ap_id
465 }
466
467 with true <- origin.ap_id in target.also_known_as,
468 {:ok, activity} <- insert(params, local),
469 _ <- notify_and_stream(activity) do
470 maybe_federate(activity)
471
472 BackgroundWorker.enqueue("move_following", %{
473 "origin_id" => origin.id,
474 "target_id" => target.id
475 })
476
477 {:ok, activity}
478 else
479 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
480 err -> err
481 end
482 end
483
484 def fetch_activities_for_context_query(context, opts) do
485 public = [Constants.as_public()]
486
487 recipients =
488 if opts[:user],
489 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
490 else: public
491
492 from(activity in Activity)
493 |> maybe_preload_objects(opts)
494 |> maybe_preload_bookmarks(opts)
495 |> maybe_set_thread_muted_field(opts)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts[:user])
498 |> where(
499 [activity],
500 fragment(
501 "?->>'type' = ? and ?->>'context' = ?",
502 activity.data,
503 "Create",
504 activity.data,
505 ^context
506 )
507 )
508 |> exclude_poll_votes(opts)
509 |> exclude_id(opts)
510 |> order_by([activity], desc: activity.id)
511 end
512
513 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
514 def fetch_activities_for_context(context, opts \\ %{}) do
515 context
516 |> fetch_activities_for_context_query(opts)
517 |> Repo.all()
518 end
519
520 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
521 FlakeId.Ecto.CompatType.t() | nil
522 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
523 context
524 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
525 |> restrict_visibility(%{visibility: "direct"})
526 |> limit(1)
527 |> select([a], a.id)
528 |> Repo.one()
529 end
530
531 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
532 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
533 opts = Map.delete(opts, :user)
534
535 [Constants.as_public()]
536 |> fetch_activities_query(opts)
537 |> restrict_unlisted(opts)
538 |> Pagination.fetch_paginated(opts, pagination)
539 end
540
541 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
542 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
543 opts
544 |> Map.put(:restrict_unlisted, true)
545 |> fetch_public_or_unlisted_activities(pagination)
546 end
547
548 @valid_visibilities ~w[direct unlisted public private]
549
550 defp restrict_visibility(query, %{visibility: visibility})
551 when is_list(visibility) do
552 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
553 from(
554 a in query,
555 where:
556 fragment(
557 "activity_visibility(?, ?, ?) = ANY (?)",
558 a.actor,
559 a.recipients,
560 a.data,
561 ^visibility
562 )
563 )
564 else
565 Logger.error("Could not restrict visibility to #{visibility}")
566 end
567 end
568
569 defp restrict_visibility(query, %{visibility: visibility})
570 when visibility in @valid_visibilities do
571 from(
572 a in query,
573 where:
574 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
575 )
576 end
577
578 defp restrict_visibility(_query, %{visibility: visibility})
579 when visibility not in @valid_visibilities do
580 Logger.error("Could not restrict visibility to #{visibility}")
581 end
582
583 defp restrict_visibility(query, _visibility), do: query
584
585 defp exclude_visibility(query, %{exclude_visibilities: visibility})
586 when is_list(visibility) do
587 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
588 from(
589 a in query,
590 where:
591 not fragment(
592 "activity_visibility(?, ?, ?) = ANY (?)",
593 a.actor,
594 a.recipients,
595 a.data,
596 ^visibility
597 )
598 )
599 else
600 Logger.error("Could not exclude visibility to #{visibility}")
601 query
602 end
603 end
604
605 defp exclude_visibility(query, %{exclude_visibilities: visibility})
606 when visibility in @valid_visibilities do
607 from(
608 a in query,
609 where:
610 not fragment(
611 "activity_visibility(?, ?, ?) = ?",
612 a.actor,
613 a.recipients,
614 a.data,
615 ^visibility
616 )
617 )
618 end
619
620 defp exclude_visibility(query, %{exclude_visibilities: visibility})
621 when visibility not in [nil | @valid_visibilities] do
622 Logger.error("Could not exclude visibility to #{visibility}")
623 query
624 end
625
626 defp exclude_visibility(query, _visibility), do: query
627
628 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
629 do: query
630
631 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
632 do: query
633
634 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
635 from(
636 a in query,
637 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
638 )
639 end
640
641 defp restrict_thread_visibility(query, _, _), do: query
642
643 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
644 params =
645 params
646 |> Map.put(:user, reading_user)
647 |> Map.put(:actor_id, user.ap_id)
648
649 %{
650 godmode: params[:godmode],
651 reading_user: reading_user
652 }
653 |> user_activities_recipients()
654 |> fetch_activities(params)
655 |> Enum.reverse()
656 end
657
658 def fetch_user_activities(user, reading_user, params \\ %{}) do
659 params =
660 params
661 |> Map.put(:type, ["Create", "Announce"])
662 |> Map.put(:user, reading_user)
663 |> Map.put(:actor_id, user.ap_id)
664 |> Map.put(:pinned_activity_ids, user.pinned_activities)
665
666 params =
667 if User.blocks?(reading_user, user) do
668 params
669 else
670 params
671 |> Map.put(:blocking_user, reading_user)
672 |> Map.put(:muting_user, reading_user)
673 end
674
675 %{
676 godmode: params[:godmode],
677 reading_user: reading_user
678 }
679 |> user_activities_recipients()
680 |> fetch_activities(params)
681 |> Enum.reverse()
682 end
683
684 def fetch_statuses(reading_user, params) do
685 params = Map.put(params, :type, ["Create", "Announce"])
686
687 %{
688 godmode: params[:godmode],
689 reading_user: reading_user
690 }
691 |> user_activities_recipients()
692 |> fetch_activities(params, :offset)
693 |> Enum.reverse()
694 end
695
696 defp user_activities_recipients(%{godmode: true}), do: []
697
698 defp user_activities_recipients(%{reading_user: reading_user}) do
699 if reading_user do
700 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
701 else
702 [Constants.as_public()]
703 end
704 end
705
706 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
707 raise "Can't use the child object without preloading!"
708 end
709
710 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
711 from(
712 [activity, object] in query,
713 where:
714 fragment(
715 "?->>'type' != ? or ?->>'actor' != ?",
716 activity.data,
717 "Announce",
718 object.data,
719 ^actor
720 )
721 )
722 end
723
724 defp restrict_announce_object_actor(query, _), do: query
725
726 defp restrict_since(query, %{since_id: ""}), do: query
727
728 defp restrict_since(query, %{since_id: since_id}) do
729 from(activity in query, where: activity.id > ^since_id)
730 end
731
732 defp restrict_since(query, _), do: query
733
734 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
735 raise "Can't use the child object without preloading!"
736 end
737
738 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
739 from(
740 [_activity, object] in query,
741 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
742 )
743 end
744
745 defp restrict_tag_reject(query, _), do: query
746
747 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
748 raise "Can't use the child object without preloading!"
749 end
750
751 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
752 from(
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
755 )
756 end
757
758 defp restrict_tag_all(query, _), do: query
759
760 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
761 raise "Can't use the child object without preloading!"
762 end
763
764 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
765 from(
766 [_activity, object] in query,
767 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
768 )
769 end
770
771 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
772 from(
773 [_activity, object] in query,
774 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
775 )
776 end
777
778 defp restrict_tag(query, _), do: query
779
780 defp restrict_recipients(query, [], _user), do: query
781
782 defp restrict_recipients(query, recipients, nil) do
783 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
784 end
785
786 defp restrict_recipients(query, recipients, user) do
787 from(
788 activity in query,
789 where: fragment("? && ?", ^recipients, activity.recipients),
790 or_where: activity.actor == ^user.ap_id
791 )
792 end
793
794 defp restrict_local(query, %{local_only: true}) do
795 from(activity in query, where: activity.local == true)
796 end
797
798 defp restrict_local(query, _), do: query
799
800 defp restrict_actor(query, %{actor_id: actor_id}) do
801 from(activity in query, where: activity.actor == ^actor_id)
802 end
803
804 defp restrict_actor(query, _), do: query
805
806 defp restrict_type(query, %{type: type}) when is_binary(type) do
807 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
808 end
809
810 defp restrict_type(query, %{type: type}) do
811 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
812 end
813
814 defp restrict_type(query, _), do: query
815
816 defp restrict_state(query, %{state: state}) do
817 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
818 end
819
820 defp restrict_state(query, _), do: query
821
822 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
823 from(
824 [_activity, object] in query,
825 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
826 )
827 end
828
829 defp restrict_favorited_by(query, _), do: query
830
831 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
832 raise "Can't use the child object without preloading!"
833 end
834
835 defp restrict_media(query, %{only_media: true}) do
836 from(
837 [_activity, object] in query,
838 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
839 )
840 end
841
842 defp restrict_media(query, _), do: query
843
844 defp restrict_replies(query, %{exclude_replies: true}) do
845 from(
846 [_activity, object] in query,
847 where: fragment("?->>'inReplyTo' is null", object.data)
848 )
849 end
850
851 defp restrict_replies(query, %{
852 reply_filtering_user: user,
853 reply_visibility: "self"
854 }) do
855 from(
856 [activity, object] in query,
857 where:
858 fragment(
859 "?->>'inReplyTo' is null OR ? = ANY(?)",
860 object.data,
861 ^user.ap_id,
862 activity.recipients
863 )
864 )
865 end
866
867 defp restrict_replies(query, %{
868 reply_filtering_user: user,
869 reply_visibility: "following"
870 }) do
871 from(
872 [activity, object] in query,
873 where:
874 fragment(
875 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
876 object.data,
877 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
878 activity.recipients,
879 activity.actor,
880 activity.actor,
881 ^user.ap_id
882 )
883 )
884 end
885
886 defp restrict_replies(query, _), do: query
887
888 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
889 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
890 end
891
892 defp restrict_reblogs(query, _), do: query
893
894 defp restrict_muted(query, %{with_muted: true}), do: query
895
896 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
897 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
898
899 query =
900 from([activity] in query,
901 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
902 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
903 )
904
905 unless opts[:skip_preload] do
906 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
907 else
908 query
909 end
910 end
911
912 defp restrict_muted(query, _), do: query
913
914 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
915 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
916 domain_blocks = user.domain_blocks || []
917
918 following_ap_ids = User.get_friends_ap_ids(user)
919
920 query =
921 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
922
923 from(
924 [activity, object: o] in query,
925 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
926 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
927 where:
928 fragment(
929 "recipients_contain_blocked_domains(?, ?) = false",
930 activity.recipients,
931 ^domain_blocks
932 ),
933 where:
934 fragment(
935 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
936 activity.data,
937 activity.data,
938 ^blocked_ap_ids
939 ),
940 where:
941 fragment(
942 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
943 activity.actor,
944 ^domain_blocks,
945 activity.actor,
946 ^following_ap_ids
947 ),
948 where:
949 fragment(
950 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
951 o.data,
952 ^domain_blocks,
953 o.data,
954 ^following_ap_ids
955 )
956 )
957 end
958
959 defp restrict_blocked(query, _), do: query
960
961 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
962 from(
963 activity in query,
964 where:
965 fragment(
966 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
967 activity.data,
968 ^[Constants.as_public()]
969 )
970 )
971 end
972
973 defp restrict_unlisted(query, _), do: query
974
975 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
976 from(activity in query, where: activity.id in ^ids)
977 end
978
979 defp restrict_pinned(query, _), do: query
980
981 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
982 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
983
984 from(
985 activity in query,
986 where:
987 fragment(
988 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
989 activity.data,
990 activity.actor,
991 ^muted_reblogs
992 )
993 )
994 end
995
996 defp restrict_muted_reblogs(query, _), do: query
997
998 defp restrict_instance(query, %{instance: instance}) do
999 users =
1000 from(
1001 u in User,
1002 select: u.ap_id,
1003 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1004 )
1005 |> Repo.all()
1006
1007 from(activity in query, where: activity.actor in ^users)
1008 end
1009
1010 defp restrict_instance(query, _), do: query
1011
1012 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1013
1014 defp exclude_poll_votes(query, _) do
1015 if has_named_binding?(query, :object) do
1016 from([activity, object: o] in query,
1017 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1018 )
1019 else
1020 query
1021 end
1022 end
1023
1024 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1025
1026 defp exclude_chat_messages(query, _) do
1027 if has_named_binding?(query, :object) do
1028 from([activity, object: o] in query,
1029 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1030 )
1031 else
1032 query
1033 end
1034 end
1035
1036 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1037
1038 defp exclude_invisible_actors(query, _opts) do
1039 invisible_ap_ids =
1040 User.Query.build(%{invisible: true, select: [:ap_id]})
1041 |> Repo.all()
1042 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1043
1044 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1045 end
1046
1047 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1048 from(activity in query, where: activity.id != ^id)
1049 end
1050
1051 defp exclude_id(query, _), do: query
1052
1053 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1054
1055 defp maybe_preload_objects(query, _) do
1056 query
1057 |> Activity.with_preloaded_object()
1058 end
1059
1060 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1061
1062 defp maybe_preload_bookmarks(query, opts) do
1063 query
1064 |> Activity.with_preloaded_bookmark(opts[:user])
1065 end
1066
1067 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1068 query
1069 |> Activity.with_preloaded_report_notes()
1070 end
1071
1072 defp maybe_preload_report_notes(query, _), do: query
1073
1074 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1075
1076 defp maybe_set_thread_muted_field(query, opts) do
1077 query
1078 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1079 end
1080
1081 defp maybe_order(query, %{order: :desc}) do
1082 query
1083 |> order_by(desc: :id)
1084 end
1085
1086 defp maybe_order(query, %{order: :asc}) do
1087 query
1088 |> order_by(asc: :id)
1089 end
1090
1091 defp maybe_order(query, _), do: query
1092
1093 defp fetch_activities_query_ap_ids_ops(opts) do
1094 source_user = opts[:muting_user]
1095 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1096
1097 ap_id_relationships =
1098 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1099 [:block | ap_id_relationships]
1100 else
1101 ap_id_relationships
1102 end
1103
1104 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1105
1106 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1107 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1108
1109 restrict_muted_reblogs_opts =
1110 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1111
1112 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1113 end
1114
1115 def fetch_activities_query(recipients, opts \\ %{}) do
1116 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1117 fetch_activities_query_ap_ids_ops(opts)
1118
1119 config = %{
1120 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1121 }
1122
1123 Activity
1124 |> maybe_preload_objects(opts)
1125 |> maybe_preload_bookmarks(opts)
1126 |> maybe_preload_report_notes(opts)
1127 |> maybe_set_thread_muted_field(opts)
1128 |> maybe_order(opts)
1129 |> restrict_recipients(recipients, opts[:user])
1130 |> restrict_replies(opts)
1131 |> restrict_tag(opts)
1132 |> restrict_tag_reject(opts)
1133 |> restrict_tag_all(opts)
1134 |> restrict_since(opts)
1135 |> restrict_local(opts)
1136 |> restrict_actor(opts)
1137 |> restrict_type(opts)
1138 |> restrict_state(opts)
1139 |> restrict_favorited_by(opts)
1140 |> restrict_blocked(restrict_blocked_opts)
1141 |> restrict_muted(restrict_muted_opts)
1142 |> restrict_media(opts)
1143 |> restrict_visibility(opts)
1144 |> restrict_thread_visibility(opts, config)
1145 |> restrict_reblogs(opts)
1146 |> restrict_pinned(opts)
1147 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1148 |> restrict_instance(opts)
1149 |> restrict_announce_object_actor(opts)
1150 |> Activity.restrict_deactivated_users()
1151 |> exclude_poll_votes(opts)
1152 |> exclude_chat_messages(opts)
1153 |> exclude_invisible_actors(opts)
1154 |> exclude_visibility(opts)
1155 end
1156
1157 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1158 list_memberships = Pleroma.List.memberships(opts[:user])
1159
1160 fetch_activities_query(recipients ++ list_memberships, opts)
1161 |> Pagination.fetch_paginated(opts, pagination)
1162 |> Enum.reverse()
1163 |> maybe_update_cc(list_memberships, opts[:user])
1164 end
1165
1166 @doc """
1167 Fetch favorites activities of user with order by sort adds to favorites
1168 """
1169 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1170 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1171 user.ap_id
1172 |> Activity.Queries.by_actor()
1173 |> Activity.Queries.by_type("Like")
1174 |> Activity.with_joined_object()
1175 |> Object.with_joined_activity()
1176 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1177 |> order_by([like, _, _], desc_nulls_last: like.id)
1178 |> Pagination.fetch_paginated(
1179 Map.merge(params, %{skip_order: true}),
1180 pagination
1181 )
1182 end
1183
1184 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1185 Enum.map(activities, fn
1186 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1187 if Enum.any?(bcc, &(&1 in list_memberships)) do
1188 update_in(activity.data["cc"], &[user_ap_id | &1])
1189 else
1190 activity
1191 end
1192
1193 activity ->
1194 activity
1195 end)
1196 end
1197
1198 defp maybe_update_cc(activities, _, _), do: activities
1199
1200 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1201 from(activity in query,
1202 where:
1203 fragment("? && ?", activity.recipients, ^recipients) or
1204 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1205 ^Constants.as_public() in activity.recipients)
1206 )
1207 end
1208
1209 def fetch_activities_bounded(
1210 recipients,
1211 recipients_with_public,
1212 opts \\ %{},
1213 pagination \\ :keyset
1214 ) do
1215 fetch_activities_query([], opts)
1216 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1217 |> Pagination.fetch_paginated(opts, pagination)
1218 |> Enum.reverse()
1219 end
1220
1221 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1222 def upload(file, opts \\ []) do
1223 with {:ok, data} <- Upload.store(file, opts) do
1224 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1225
1226 Repo.insert(%Object{data: obj_data})
1227 end
1228 end
1229
1230 @spec get_actor_url(any()) :: binary() | nil
1231 defp get_actor_url(url) when is_binary(url), do: url
1232 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1233
1234 defp get_actor_url(url) when is_list(url) do
1235 url
1236 |> List.first()
1237 |> get_actor_url()
1238 end
1239
1240 defp get_actor_url(_url), do: nil
1241
1242 defp object_to_user_data(data) do
1243 avatar =
1244 data["icon"]["url"] &&
1245 %{
1246 "type" => "Image",
1247 "url" => [%{"href" => data["icon"]["url"]}]
1248 }
1249
1250 banner =
1251 data["image"]["url"] &&
1252 %{
1253 "type" => "Image",
1254 "url" => [%{"href" => data["image"]["url"]}]
1255 }
1256
1257 fields =
1258 data
1259 |> Map.get("attachment", [])
1260 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1261 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1262
1263 emojis =
1264 data
1265 |> Map.get("tag", [])
1266 |> Enum.filter(fn
1267 %{"type" => "Emoji"} -> true
1268 _ -> false
1269 end)
1270 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1271 {String.trim(name, ":"), url}
1272 end)
1273
1274 locked = data["manuallyApprovesFollowers"] || false
1275 data = Transmogrifier.maybe_fix_user_object(data)
1276 discoverable = data["discoverable"] || false
1277 invisible = data["invisible"] || false
1278 actor_type = data["type"] || "Person"
1279
1280 public_key =
1281 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1282 data["publicKey"]["publicKeyPem"]
1283 else
1284 nil
1285 end
1286
1287 shared_inbox =
1288 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1289 data["endpoints"]["sharedInbox"]
1290 else
1291 nil
1292 end
1293
1294 user_data = %{
1295 ap_id: data["id"],
1296 uri: get_actor_url(data["url"]),
1297 ap_enabled: true,
1298 banner: banner,
1299 fields: fields,
1300 emoji: emojis,
1301 locked: locked,
1302 discoverable: discoverable,
1303 invisible: invisible,
1304 avatar: avatar,
1305 name: data["name"],
1306 follower_address: data["followers"],
1307 following_address: data["following"],
1308 bio: data["summary"],
1309 actor_type: actor_type,
1310 also_known_as: Map.get(data, "alsoKnownAs", []),
1311 public_key: public_key,
1312 inbox: data["inbox"],
1313 shared_inbox: shared_inbox
1314 }
1315
1316 # nickname can be nil because of virtual actors
1317 if data["preferredUsername"] do
1318 Map.put(
1319 user_data,
1320 :nickname,
1321 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1322 )
1323 else
1324 Map.put(user_data, :nickname, nil)
1325 end
1326 end
1327
1328 def fetch_follow_information_for_user(user) do
1329 with {:ok, following_data} <-
1330 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1331 {:ok, hide_follows} <- collection_private(following_data),
1332 {:ok, followers_data} <-
1333 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1334 {:ok, hide_followers} <- collection_private(followers_data) do
1335 {:ok,
1336 %{
1337 hide_follows: hide_follows,
1338 follower_count: normalize_counter(followers_data["totalItems"]),
1339 following_count: normalize_counter(following_data["totalItems"]),
1340 hide_followers: hide_followers
1341 }}
1342 else
1343 {:error, _} = e -> e
1344 e -> {:error, e}
1345 end
1346 end
1347
1348 defp normalize_counter(counter) when is_integer(counter), do: counter
1349 defp normalize_counter(_), do: 0
1350
1351 def maybe_update_follow_information(user_data) do
1352 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1353 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1354 {_, true} <-
1355 {:collections_available,
1356 !!(user_data[:following_address] && user_data[:follower_address])},
1357 {:ok, info} <-
1358 fetch_follow_information_for_user(user_data) do
1359 info = Map.merge(user_data[:info] || %{}, info)
1360
1361 user_data
1362 |> Map.put(:info, info)
1363 else
1364 {:user_type_check, false} ->
1365 user_data
1366
1367 {:collections_available, false} ->
1368 user_data
1369
1370 {:enabled, false} ->
1371 user_data
1372
1373 e ->
1374 Logger.error(
1375 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1376 )
1377
1378 user_data
1379 end
1380 end
1381
1382 defp collection_private(%{"first" => %{"type" => type}})
1383 when type in ["CollectionPage", "OrderedCollectionPage"],
1384 do: {:ok, false}
1385
1386 defp collection_private(%{"first" => first}) do
1387 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1388 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1389 {:ok, false}
1390 else
1391 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1392 {:error, _} = e -> e
1393 e -> {:error, e}
1394 end
1395 end
1396
1397 defp collection_private(_data), do: {:ok, true}
1398
1399 def user_data_from_user_object(data) do
1400 with {:ok, data} <- MRF.filter(data) do
1401 {:ok, object_to_user_data(data)}
1402 else
1403 e -> {:error, e}
1404 end
1405 end
1406
1407 def fetch_and_prepare_user_from_ap_id(ap_id) do
1408 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1409 {:ok, data} <- user_data_from_user_object(data) do
1410 {:ok, maybe_update_follow_information(data)}
1411 else
1412 {:error, "Object has been deleted" = e} ->
1413 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1414 {:error, e}
1415
1416 {:error, e} ->
1417 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1418 {:error, e}
1419 end
1420 end
1421
1422 def make_user_from_ap_id(ap_id) do
1423 user = User.get_cached_by_ap_id(ap_id)
1424
1425 if user && !User.ap_enabled?(user) do
1426 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1427 else
1428 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1429 if user do
1430 user
1431 |> User.remote_user_changeset(data)
1432 |> User.update_and_set_cache()
1433 else
1434 data
1435 |> User.remote_user_changeset()
1436 |> Repo.insert()
1437 |> User.set_cache()
1438 end
1439 end
1440 end
1441 end
1442
1443 def make_user_from_nickname(nickname) do
1444 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1445 make_user_from_ap_id(ap_id)
1446 else
1447 _e -> {:error, "No AP id in WebFinger"}
1448 end
1449 end
1450
1451 # filter out broken threads
1452 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1453 entire_thread_visible_for_user?(activity, user)
1454 end
1455
1456 # do post-processing on a specific activity
1457 def contain_activity(%Activity{} = activity, %User{} = user) do
1458 contain_broken_threads(activity, user)
1459 end
1460
1461 def fetch_direct_messages_query do
1462 Activity
1463 |> restrict_type(%{type: "Create"})
1464 |> restrict_visibility(%{visibility: "direct"})
1465 |> order_by([activity], asc: activity.id)
1466 end
1467 end