Merge branch 'features/apc2s-pagination' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Maps
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.Object.Containment
16 alias Pleroma.Object.Fetcher
17 alias Pleroma.Pagination
18 alias Pleroma.Repo
19 alias Pleroma.Upload
20 alias Pleroma.User
21 alias Pleroma.Web.ActivityPub.MRF
22 alias Pleroma.Web.ActivityPub.Transmogrifier
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
26
27 import Ecto.Query
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
30
31 require Logger
32 require Pleroma.Constants
33
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
41
42 recipients =
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
45 nil -> true
46 user -> User.following?(user, actor)
47 end
48 end)
49
50 {recipients, to, cc}
51 end
52
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
59 {recipients, to, cc}
60 end
61
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
67 {recipients, to, cc}
68 end
69
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
74 true
75 else
76 _e -> false
77 end
78 else
79 true
80 end
81 end
82
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
86 end
87
88 defp check_remote_limit(_), do: true
89
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
92 end
93
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
96 end
97
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
100 "type" => "Create"
101 }) do
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
104 end
105 end
106
107 def increase_replies_count_if_reply(_create_data), do: :noop
108
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
111 }) do
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
114 end
115 end
116
117 def decrease_replies_count_if_reply(_object), do: :noop
118
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
121 "type" => "Create",
122 "actor" => actor
123 }) do
124 Object.increase_vote_count(reply_ap_id, name, actor)
125 end
126
127 def increase_poll_votes_if_vote(_create_data), do: :noop
128
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
133 {:ok, activity} <-
134 Repo.insert(%Activity{
135 data: object,
136 local: local,
137 recipients: recipients,
138 actor: object["actor"]
139 }) do
140 {:ok, activity, meta}
141 end
142 end
143
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
155 {:ok, activity} =
156 Repo.insert(%Activity{
157 data: map,
158 local: local,
159 actor: map["actor"],
160 recipients: recipients
161 })
162
163 # Splice in the child object if we have one.
164 activity = Maps.put_if_present(activity, :object, object)
165
166 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
167
168 {:ok, activity}
169 else
170 %Activity{} = activity ->
171 {:ok, activity}
172
173 {:fake, true, map, recipients} ->
174 activity = %Activity{
175 data: map,
176 local: local,
177 actor: map["actor"],
178 recipients: recipients,
179 id: "pleroma:fakeid"
180 }
181
182 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
183 {:ok, activity}
184
185 error ->
186 {:error, error}
187 end
188 end
189
190 def notify_and_stream(activity) do
191 Notification.create_notifications(activity)
192
193 conversation = create_or_bump_conversation(activity, activity.actor)
194 participations = get_participations(conversation)
195 stream_out(activity)
196 stream_out_participations(participations)
197 end
198
199 defp create_or_bump_conversation(activity, actor) do
200 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
201 %User{} = user <- User.get_cached_by_ap_id(actor),
202 Participation.mark_as_read(user, conversation) do
203 {:ok, conversation}
204 end
205 end
206
207 defp get_participations({:ok, conversation}) do
208 conversation
209 |> Repo.preload(:participations, force: true)
210 |> Map.get(:participations)
211 end
212
213 defp get_participations(_), do: []
214
215 def stream_out_participations(participations) do
216 participations =
217 participations
218 |> Repo.preload(:user)
219
220 Streamer.stream("participation", participations)
221 end
222
223 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
224 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
225 conversation = Repo.preload(conversation, :participations),
226 last_activity_id =
227 fetch_latest_activity_id_for_context(conversation.ap_id, %{
228 "user" => user,
229 "blocking_user" => user
230 }) do
231 if last_activity_id do
232 stream_out_participations(conversation.participations)
233 end
234 end
235 end
236
237 def stream_out_participations(_, _), do: :noop
238
239 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
240 when data_type in ["Create", "Announce", "Delete"] do
241 activity
242 |> Topics.get_activity_topics()
243 |> Streamer.stream(activity)
244 end
245
246 def stream_out(_activity) do
247 :noop
248 end
249
250 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
251 def create(params, fake \\ false) do
252 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
253 result
254 end
255 end
256
257 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
258 additional = params[:additional] || %{}
259 # only accept false as false value
260 local = !(params[:local] == false)
261 published = params[:published]
262 quick_insert? = Config.get([:env]) == :benchmark
263
264 with create_data <-
265 make_create_data(
266 %{to: to, actor: actor, published: published, context: context, object: object},
267 additional
268 ),
269 {:ok, activity} <- insert(create_data, local, fake),
270 {:fake, false, activity} <- {:fake, fake, activity},
271 _ <- increase_replies_count_if_reply(create_data),
272 _ <- increase_poll_votes_if_vote(create_data),
273 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
274 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
275 _ <- notify_and_stream(activity),
276 :ok <- maybe_federate(activity) do
277 {:ok, activity}
278 else
279 {:quick_insert, true, activity} ->
280 {:ok, activity}
281
282 {:fake, true, activity} ->
283 {:ok, activity}
284
285 {:error, message} ->
286 Repo.rollback(message)
287 end
288 end
289
290 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
291 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
292 additional = params[:additional] || %{}
293 # only accept false as false value
294 local = !(params[:local] == false)
295 published = params[:published]
296
297 with listen_data <-
298 make_listen_data(
299 %{to: to, actor: actor, published: published, context: context, object: object},
300 additional
301 ),
302 {:ok, activity} <- insert(listen_data, local),
303 _ <- notify_and_stream(activity),
304 :ok <- maybe_federate(activity) do
305 {:ok, activity}
306 end
307 end
308
309 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
310 def accept(params) do
311 accept_or_reject("Accept", params)
312 end
313
314 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def reject(params) do
316 accept_or_reject("Reject", params)
317 end
318
319 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
320 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
321 local = Map.get(params, :local, true)
322 activity_id = Map.get(params, :activity_id, nil)
323
324 with data <-
325 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
326 |> Maps.put_if_present("id", activity_id),
327 {:ok, activity} <- insert(data, local),
328 _ <- notify_and_stream(activity),
329 :ok <- maybe_federate(activity) do
330 {:ok, activity}
331 end
332 end
333
334 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
335 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
336 local = !(params[:local] == false)
337 activity_id = params[:activity_id]
338
339 with data <- %{
340 "to" => to,
341 "cc" => cc,
342 "type" => "Update",
343 "actor" => actor,
344 "object" => object
345 },
346 data <- Maps.put_if_present(data, "id", activity_id),
347 {:ok, activity} <- insert(data, local),
348 _ <- notify_and_stream(activity),
349 :ok <- maybe_federate(activity) do
350 {:ok, activity}
351 end
352 end
353
354 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
355 {:ok, Activity.t()} | {:error, any()}
356 def follow(follower, followed, activity_id \\ nil, local \\ true) do
357 with {:ok, result} <-
358 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
359 result
360 end
361 end
362
363 defp do_follow(follower, followed, activity_id, local) do
364 with data <- make_follow_data(follower, followed, activity_id),
365 {:ok, activity} <- insert(data, local),
366 _ <- notify_and_stream(activity),
367 :ok <- maybe_federate(activity) do
368 {:ok, activity}
369 else
370 {:error, error} -> Repo.rollback(error)
371 end
372 end
373
374 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
375 {:ok, Activity.t()} | nil | {:error, any()}
376 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
377 with {:ok, result} <-
378 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
379 result
380 end
381 end
382
383 defp do_unfollow(follower, followed, activity_id, local) do
384 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
385 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
386 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
387 {:ok, activity} <- insert(unfollow_data, local),
388 _ <- notify_and_stream(activity),
389 :ok <- maybe_federate(activity) do
390 {:ok, activity}
391 else
392 nil -> nil
393 {:error, error} -> Repo.rollback(error)
394 end
395 end
396
397 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
398 {:ok, Activity.t()} | {:error, any()}
399 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
400 with {:ok, result} <-
401 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
402 result
403 end
404 end
405
406 defp do_block(blocker, blocked, activity_id, local) do
407 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
408
409 if unfollow_blocked do
410 follow_activity = fetch_latest_follow(blocker, blocked)
411 if follow_activity, do: unfollow(blocker, blocked, nil, local)
412 end
413
414 with block_data <- make_block_data(blocker, blocked, activity_id),
415 {:ok, activity} <- insert(block_data, local),
416 _ <- notify_and_stream(activity),
417 :ok <- maybe_federate(activity) do
418 {:ok, activity}
419 else
420 {:error, error} -> Repo.rollback(error)
421 end
422 end
423
424 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
425 def flag(
426 %{
427 actor: actor,
428 context: _context,
429 account: account,
430 statuses: statuses,
431 content: content
432 } = params
433 ) do
434 # only accept false as false value
435 local = !(params[:local] == false)
436 forward = !(params[:forward] == false)
437
438 additional = params[:additional] || %{}
439
440 additional =
441 if forward do
442 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
443 else
444 Map.merge(additional, %{"to" => [], "cc" => []})
445 end
446
447 with flag_data <- make_flag_data(params, additional),
448 {:ok, activity} <- insert(flag_data, local),
449 {:ok, stripped_activity} <- strip_report_status_data(activity),
450 _ <- notify_and_stream(activity),
451 :ok <- maybe_federate(stripped_activity) do
452 User.all_superusers()
453 |> Enum.filter(fn user -> not is_nil(user.email) end)
454 |> Enum.each(fn superuser ->
455 superuser
456 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
457 |> Pleroma.Emails.Mailer.deliver_async()
458 end)
459
460 {:ok, activity}
461 end
462 end
463
464 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
465 def move(%User{} = origin, %User{} = target, local \\ true) do
466 params = %{
467 "type" => "Move",
468 "actor" => origin.ap_id,
469 "object" => origin.ap_id,
470 "target" => target.ap_id
471 }
472
473 with true <- origin.ap_id in target.also_known_as,
474 {:ok, activity} <- insert(params, local),
475 _ <- notify_and_stream(activity) do
476 maybe_federate(activity)
477
478 BackgroundWorker.enqueue("move_following", %{
479 "origin_id" => origin.id,
480 "target_id" => target.id
481 })
482
483 {:ok, activity}
484 else
485 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
486 err -> err
487 end
488 end
489
490 def fetch_activities_for_context_query(context, opts) do
491 public = [Constants.as_public()]
492
493 recipients =
494 if opts["user"],
495 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
496 else: public
497
498 from(activity in Activity)
499 |> maybe_preload_objects(opts)
500 |> maybe_preload_bookmarks(opts)
501 |> maybe_set_thread_muted_field(opts)
502 |> restrict_blocked(opts)
503 |> restrict_recipients(recipients, opts["user"])
504 |> where(
505 [activity],
506 fragment(
507 "?->>'type' = ? and ?->>'context' = ?",
508 activity.data,
509 "Create",
510 activity.data,
511 ^context
512 )
513 )
514 |> exclude_poll_votes(opts)
515 |> exclude_id(opts)
516 |> order_by([activity], desc: activity.id)
517 end
518
519 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
520 def fetch_activities_for_context(context, opts \\ %{}) do
521 context
522 |> fetch_activities_for_context_query(opts)
523 |> Repo.all()
524 end
525
526 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
527 FlakeId.Ecto.CompatType.t() | nil
528 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
529 context
530 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
531 |> limit(1)
532 |> select([a], a.id)
533 |> Repo.one()
534 end
535
536 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
537 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
538 opts = Map.drop(opts, ["user"])
539
540 query = fetch_activities_query([Constants.as_public()], opts)
541
542 query =
543 if opts["restrict_unlisted"] do
544 restrict_unlisted(query)
545 else
546 query
547 end
548
549 Pagination.fetch_paginated(query, opts, pagination)
550 end
551
552 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
553 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
554 opts
555 |> Map.put("restrict_unlisted", true)
556 |> fetch_public_or_unlisted_activities(pagination)
557 end
558
559 @valid_visibilities ~w[direct unlisted public private]
560
561 defp restrict_visibility(query, %{visibility: visibility})
562 when is_list(visibility) do
563 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
564 query =
565 from(
566 a in query,
567 where:
568 fragment(
569 "activity_visibility(?, ?, ?) = ANY (?)",
570 a.actor,
571 a.recipients,
572 a.data,
573 ^visibility
574 )
575 )
576
577 query
578 else
579 Logger.error("Could not restrict visibility to #{visibility}")
580 end
581 end
582
583 defp restrict_visibility(query, %{visibility: visibility})
584 when visibility in @valid_visibilities do
585 from(
586 a in query,
587 where:
588 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
589 )
590 end
591
592 defp restrict_visibility(_query, %{visibility: visibility})
593 when visibility not in @valid_visibilities do
594 Logger.error("Could not restrict visibility to #{visibility}")
595 end
596
597 defp restrict_visibility(query, _visibility), do: query
598
599 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
600 when is_list(visibility) do
601 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
602 from(
603 a in query,
604 where:
605 not fragment(
606 "activity_visibility(?, ?, ?) = ANY (?)",
607 a.actor,
608 a.recipients,
609 a.data,
610 ^visibility
611 )
612 )
613 else
614 Logger.error("Could not exclude visibility to #{visibility}")
615 query
616 end
617 end
618
619 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
620 when visibility in @valid_visibilities do
621 from(
622 a in query,
623 where:
624 not fragment(
625 "activity_visibility(?, ?, ?) = ?",
626 a.actor,
627 a.recipients,
628 a.data,
629 ^visibility
630 )
631 )
632 end
633
634 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
635 when visibility not in [nil | @valid_visibilities] do
636 Logger.error("Could not exclude visibility to #{visibility}")
637 query
638 end
639
640 defp exclude_visibility(query, _visibility), do: query
641
642 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
643 do: query
644
645 defp restrict_thread_visibility(
646 query,
647 %{"user" => %User{skip_thread_containment: true}},
648 _
649 ),
650 do: query
651
652 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
653 from(
654 a in query,
655 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
656 )
657 end
658
659 defp restrict_thread_visibility(query, _, _), do: query
660
661 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
662 params =
663 params
664 |> Map.put("user", reading_user)
665 |> Map.put("actor_id", user.ap_id)
666
667 recipients =
668 user_activities_recipients(%{
669 "godmode" => params["godmode"],
670 "reading_user" => reading_user
671 })
672
673 fetch_activities(recipients, params)
674 |> Enum.reverse()
675 end
676
677 def fetch_user_activities(user, reading_user, params \\ %{}) do
678 params =
679 params
680 |> Map.put("type", ["Create", "Announce"])
681 |> Map.put("user", reading_user)
682 |> Map.put("actor_id", user.ap_id)
683 |> Map.put("pinned_activity_ids", user.pinned_activities)
684
685 params =
686 if User.blocks?(reading_user, user) do
687 params
688 else
689 params
690 |> Map.put("blocking_user", reading_user)
691 |> Map.put("muting_user", reading_user)
692 end
693
694 recipients =
695 user_activities_recipients(%{
696 "godmode" => params["godmode"],
697 "reading_user" => reading_user
698 })
699
700 fetch_activities(recipients, params)
701 |> Enum.reverse()
702 end
703
704 def fetch_statuses(reading_user, params) do
705 params =
706 params
707 |> Map.put("type", ["Create", "Announce"])
708
709 recipients =
710 user_activities_recipients(%{
711 "godmode" => params["godmode"],
712 "reading_user" => reading_user
713 })
714
715 fetch_activities(recipients, params, :offset)
716 |> Enum.reverse()
717 end
718
719 defp user_activities_recipients(%{"godmode" => true}) do
720 []
721 end
722
723 defp user_activities_recipients(%{"reading_user" => reading_user}) do
724 if reading_user do
725 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
726 else
727 [Constants.as_public()]
728 end
729 end
730
731 defp restrict_since(query, %{"since_id" => ""}), do: query
732
733 defp restrict_since(query, %{"since_id" => since_id}) do
734 from(activity in query, where: activity.id > ^since_id)
735 end
736
737 defp restrict_since(query, _), do: query
738
739 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
740 raise "Can't use the child object without preloading!"
741 end
742
743 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
744 when is_list(tag_reject) and tag_reject != [] do
745 from(
746 [_activity, object] in query,
747 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
748 )
749 end
750
751 defp restrict_tag_reject(query, _), do: query
752
753 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
754 raise "Can't use the child object without preloading!"
755 end
756
757 defp restrict_tag_all(query, %{"tag_all" => tag_all})
758 when is_list(tag_all) and tag_all != [] do
759 from(
760 [_activity, object] in query,
761 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
762 )
763 end
764
765 defp restrict_tag_all(query, _), do: query
766
767 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
768 raise "Can't use the child object without preloading!"
769 end
770
771 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
772 from(
773 [_activity, object] in query,
774 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
775 )
776 end
777
778 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
779 from(
780 [_activity, object] in query,
781 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
782 )
783 end
784
785 defp restrict_tag(query, _), do: query
786
787 defp restrict_recipients(query, [], _user), do: query
788
789 defp restrict_recipients(query, recipients, nil) do
790 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
791 end
792
793 defp restrict_recipients(query, recipients, user) do
794 from(
795 activity in query,
796 where: fragment("? && ?", ^recipients, activity.recipients),
797 or_where: activity.actor == ^user.ap_id
798 )
799 end
800
801 defp restrict_local(query, %{"local_only" => true}) do
802 from(activity in query, where: activity.local == true)
803 end
804
805 defp restrict_local(query, _), do: query
806
807 defp restrict_actor(query, %{"actor_id" => actor_id}) do
808 from(activity in query, where: activity.actor == ^actor_id)
809 end
810
811 defp restrict_actor(query, _), do: query
812
813 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
814 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
815 end
816
817 defp restrict_type(query, %{"type" => type}) do
818 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
819 end
820
821 defp restrict_type(query, _), do: query
822
823 defp restrict_state(query, %{"state" => state}) do
824 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
825 end
826
827 defp restrict_state(query, _), do: query
828
829 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
830 from(
831 [_activity, object] in query,
832 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
833 )
834 end
835
836 defp restrict_favorited_by(query, _), do: query
837
838 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
839 raise "Can't use the child object without preloading!"
840 end
841
842 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
843 from(
844 [_activity, object] in query,
845 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
846 )
847 end
848
849 defp restrict_media(query, _), do: query
850
851 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
852 from(
853 [_activity, object] in query,
854 where: fragment("?->>'inReplyTo' is null", object.data)
855 )
856 end
857
858 defp restrict_replies(query, %{
859 "reply_filtering_user" => user,
860 "reply_visibility" => "self"
861 }) do
862 from(
863 [activity, object] in query,
864 where:
865 fragment(
866 "?->>'inReplyTo' is null OR ? = ANY(?)",
867 object.data,
868 ^user.ap_id,
869 activity.recipients
870 )
871 )
872 end
873
874 defp restrict_replies(query, %{
875 "reply_filtering_user" => user,
876 "reply_visibility" => "following"
877 }) do
878 from(
879 [activity, object] in query,
880 where:
881 fragment(
882 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
883 object.data,
884 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
885 activity.recipients,
886 activity.actor,
887 activity.actor,
888 ^user.ap_id
889 )
890 )
891 end
892
893 defp restrict_replies(query, _), do: query
894
895 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
896 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
897 end
898
899 defp restrict_reblogs(query, _), do: query
900
901 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
902
903 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
904 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
905
906 query =
907 from([activity] in query,
908 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
909 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
910 )
911
912 unless opts["skip_preload"] do
913 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
914 else
915 query
916 end
917 end
918
919 defp restrict_muted(query, _), do: query
920
921 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
922 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
923 domain_blocks = user.domain_blocks || []
924
925 following_ap_ids = User.get_friends_ap_ids(user)
926
927 query =
928 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
929
930 from(
931 [activity, object: o] in query,
932 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
933 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
934 where:
935 fragment(
936 "recipients_contain_blocked_domains(?, ?) = false",
937 activity.recipients,
938 ^domain_blocks
939 ),
940 where:
941 fragment(
942 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
943 activity.data,
944 activity.data,
945 ^blocked_ap_ids
946 ),
947 where:
948 fragment(
949 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
950 activity.actor,
951 ^domain_blocks,
952 activity.actor,
953 ^following_ap_ids
954 ),
955 where:
956 fragment(
957 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
958 o.data,
959 ^domain_blocks,
960 o.data,
961 ^following_ap_ids
962 )
963 )
964 end
965
966 defp restrict_blocked(query, _), do: query
967
968 defp restrict_unlisted(query) do
969 from(
970 activity in query,
971 where:
972 fragment(
973 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
974 activity.data,
975 ^[Constants.as_public()]
976 )
977 )
978 end
979
980 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
981 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
982 # and `restrict_muted/2`
983
984 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
985 when pinned in [true, "true", "1"] do
986 from(activity in query, where: activity.id in ^ids)
987 end
988
989 defp restrict_pinned(query, _), do: query
990
991 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
992 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
993
994 from(
995 activity in query,
996 where:
997 fragment(
998 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
999 activity.data,
1000 activity.actor,
1001 ^muted_reblogs
1002 )
1003 )
1004 end
1005
1006 defp restrict_muted_reblogs(query, _), do: query
1007
1008 defp restrict_instance(query, %{"instance" => instance}) do
1009 users =
1010 from(
1011 u in User,
1012 select: u.ap_id,
1013 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1014 )
1015 |> Repo.all()
1016
1017 from(activity in query, where: activity.actor in ^users)
1018 end
1019
1020 defp restrict_instance(query, _), do: query
1021
1022 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1023
1024 defp exclude_poll_votes(query, _) do
1025 if has_named_binding?(query, :object) do
1026 from([activity, object: o] in query,
1027 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1028 )
1029 else
1030 query
1031 end
1032 end
1033
1034 defp exclude_invisible_actors(query, %{"invisible_actors" => true}), do: query
1035
1036 defp exclude_invisible_actors(query, _opts) do
1037 invisible_ap_ids =
1038 User.Query.build(%{invisible: true, select: [:ap_id]})
1039 |> Repo.all()
1040 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1041
1042 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1043 end
1044
1045 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1046 from(activity in query, where: activity.id != ^id)
1047 end
1048
1049 defp exclude_id(query, _), do: query
1050
1051 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1052
1053 defp maybe_preload_objects(query, _) do
1054 query
1055 |> Activity.with_preloaded_object()
1056 end
1057
1058 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1059
1060 defp maybe_preload_bookmarks(query, opts) do
1061 query
1062 |> Activity.with_preloaded_bookmark(opts["user"])
1063 end
1064
1065 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1066 query
1067 |> Activity.with_preloaded_report_notes()
1068 end
1069
1070 defp maybe_preload_report_notes(query, _), do: query
1071
1072 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1073
1074 defp maybe_set_thread_muted_field(query, opts) do
1075 query
1076 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1077 end
1078
1079 defp maybe_order(query, %{order: :desc}) do
1080 query
1081 |> order_by(desc: :id)
1082 end
1083
1084 defp maybe_order(query, %{order: :asc}) do
1085 query
1086 |> order_by(asc: :id)
1087 end
1088
1089 defp maybe_order(query, _), do: query
1090
1091 defp fetch_activities_query_ap_ids_ops(opts) do
1092 source_user = opts["muting_user"]
1093 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1094
1095 ap_id_relationships =
1096 ap_id_relationships ++
1097 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1098 [:block]
1099 else
1100 []
1101 end
1102
1103 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1104
1105 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1106 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1107
1108 restrict_muted_reblogs_opts =
1109 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1110
1111 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1112 end
1113
1114 def fetch_activities_query(recipients, opts \\ %{}) do
1115 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1116 fetch_activities_query_ap_ids_ops(opts)
1117
1118 config = %{
1119 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1120 }
1121
1122 Activity
1123 |> maybe_preload_objects(opts)
1124 |> maybe_preload_bookmarks(opts)
1125 |> maybe_preload_report_notes(opts)
1126 |> maybe_set_thread_muted_field(opts)
1127 |> maybe_order(opts)
1128 |> restrict_recipients(recipients, opts["user"])
1129 |> restrict_replies(opts)
1130 |> restrict_tag(opts)
1131 |> restrict_tag_reject(opts)
1132 |> restrict_tag_all(opts)
1133 |> restrict_since(opts)
1134 |> restrict_local(opts)
1135 |> restrict_actor(opts)
1136 |> restrict_type(opts)
1137 |> restrict_state(opts)
1138 |> restrict_favorited_by(opts)
1139 |> restrict_blocked(restrict_blocked_opts)
1140 |> restrict_muted(restrict_muted_opts)
1141 |> restrict_media(opts)
1142 |> restrict_visibility(opts)
1143 |> restrict_thread_visibility(opts, config)
1144 |> restrict_reblogs(opts)
1145 |> restrict_pinned(opts)
1146 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1147 |> restrict_instance(opts)
1148 |> Activity.restrict_deactivated_users()
1149 |> exclude_poll_votes(opts)
1150 |> exclude_invisible_actors(opts)
1151 |> exclude_visibility(opts)
1152 end
1153
1154 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1155 list_memberships = Pleroma.List.memberships(opts["user"])
1156
1157 fetch_activities_query(recipients ++ list_memberships, opts)
1158 |> Pagination.fetch_paginated(opts, pagination)
1159 |> Enum.reverse()
1160 |> maybe_update_cc(list_memberships, opts["user"])
1161 end
1162
1163 @doc """
1164 Fetch favorites activities of user with order by sort adds to favorites
1165 """
1166 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1167 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1168 user.ap_id
1169 |> Activity.Queries.by_actor()
1170 |> Activity.Queries.by_type("Like")
1171 |> Activity.with_joined_object()
1172 |> Object.with_joined_activity()
1173 |> select([_like, object, activity], %{activity | object: object})
1174 |> order_by([like, _, _], desc_nulls_last: like.id)
1175 |> Pagination.fetch_paginated(
1176 Map.merge(params, %{"skip_order" => true}),
1177 pagination,
1178 :object_activity
1179 )
1180 end
1181
1182 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1183 when is_list(list_memberships) and length(list_memberships) > 0 do
1184 Enum.map(activities, fn
1185 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1186 if Enum.any?(bcc, &(&1 in list_memberships)) do
1187 update_in(activity.data["cc"], &[user_ap_id | &1])
1188 else
1189 activity
1190 end
1191
1192 activity ->
1193 activity
1194 end)
1195 end
1196
1197 defp maybe_update_cc(activities, _, _), do: activities
1198
1199 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1200 from(activity in query,
1201 where:
1202 fragment("? && ?", activity.recipients, ^recipients) or
1203 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1204 ^Constants.as_public() in activity.recipients)
1205 )
1206 end
1207
1208 def fetch_activities_bounded(
1209 recipients,
1210 recipients_with_public,
1211 opts \\ %{},
1212 pagination \\ :keyset
1213 ) do
1214 fetch_activities_query([], opts)
1215 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1216 |> Pagination.fetch_paginated(opts, pagination)
1217 |> Enum.reverse()
1218 end
1219
1220 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1221 def upload(file, opts \\ []) do
1222 with {:ok, data} <- Upload.store(file, opts) do
1223 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1224
1225 Repo.insert(%Object{data: obj_data})
1226 end
1227 end
1228
1229 @spec get_actor_url(any()) :: binary() | nil
1230 defp get_actor_url(url) when is_binary(url), do: url
1231 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1232
1233 defp get_actor_url(url) when is_list(url) do
1234 url
1235 |> List.first()
1236 |> get_actor_url()
1237 end
1238
1239 defp get_actor_url(_url), do: nil
1240
1241 defp object_to_user_data(data) do
1242 avatar =
1243 data["icon"]["url"] &&
1244 %{
1245 "type" => "Image",
1246 "url" => [%{"href" => data["icon"]["url"]}]
1247 }
1248
1249 banner =
1250 data["image"]["url"] &&
1251 %{
1252 "type" => "Image",
1253 "url" => [%{"href" => data["image"]["url"]}]
1254 }
1255
1256 fields =
1257 data
1258 |> Map.get("attachment", [])
1259 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1260 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1261
1262 emojis =
1263 data
1264 |> Map.get("tag", [])
1265 |> Enum.filter(fn
1266 %{"type" => "Emoji"} -> true
1267 _ -> false
1268 end)
1269 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1270 Map.put(acc, String.trim(name, ":"), url)
1271 end)
1272
1273 locked = data["manuallyApprovesFollowers"] || false
1274 data = Transmogrifier.maybe_fix_user_object(data)
1275 discoverable = data["discoverable"] || false
1276 invisible = data["invisible"] || false
1277 actor_type = data["type"] || "Person"
1278
1279 public_key =
1280 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1281 data["publicKey"]["publicKeyPem"]
1282 else
1283 nil
1284 end
1285
1286 shared_inbox =
1287 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1288 data["endpoints"]["sharedInbox"]
1289 else
1290 nil
1291 end
1292
1293 user_data = %{
1294 ap_id: data["id"],
1295 uri: get_actor_url(data["url"]),
1296 ap_enabled: true,
1297 banner: banner,
1298 fields: fields,
1299 emoji: emojis,
1300 locked: locked,
1301 discoverable: discoverable,
1302 invisible: invisible,
1303 avatar: avatar,
1304 name: data["name"],
1305 follower_address: data["followers"],
1306 following_address: data["following"],
1307 bio: data["summary"],
1308 actor_type: actor_type,
1309 also_known_as: Map.get(data, "alsoKnownAs", []),
1310 public_key: public_key,
1311 inbox: data["inbox"],
1312 shared_inbox: shared_inbox
1313 }
1314
1315 # nickname can be nil because of virtual actors
1316 user_data =
1317 if data["preferredUsername"] do
1318 Map.put(
1319 user_data,
1320 :nickname,
1321 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1322 )
1323 else
1324 Map.put(user_data, :nickname, nil)
1325 end
1326
1327 {:ok, user_data}
1328 end
1329
1330 def fetch_follow_information_for_user(user) do
1331 with {:ok, following_data} <-
1332 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1333 {:ok, hide_follows} <- collection_private(following_data),
1334 {:ok, followers_data} <-
1335 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1336 {:ok, hide_followers} <- collection_private(followers_data) do
1337 {:ok,
1338 %{
1339 hide_follows: hide_follows,
1340 follower_count: normalize_counter(followers_data["totalItems"]),
1341 following_count: normalize_counter(following_data["totalItems"]),
1342 hide_followers: hide_followers
1343 }}
1344 else
1345 {:error, _} = e -> e
1346 e -> {:error, e}
1347 end
1348 end
1349
1350 defp normalize_counter(counter) when is_integer(counter), do: counter
1351 defp normalize_counter(_), do: 0
1352
1353 def maybe_update_follow_information(user_data) do
1354 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1355 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1356 {_, true} <-
1357 {:collections_available,
1358 !!(user_data[:following_address] && user_data[:follower_address])},
1359 {:ok, info} <-
1360 fetch_follow_information_for_user(user_data) do
1361 info = Map.merge(user_data[:info] || %{}, info)
1362
1363 user_data
1364 |> Map.put(:info, info)
1365 else
1366 {:user_type_check, false} ->
1367 user_data
1368
1369 {:collections_available, false} ->
1370 user_data
1371
1372 {:enabled, false} ->
1373 user_data
1374
1375 e ->
1376 Logger.error(
1377 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1378 )
1379
1380 user_data
1381 end
1382 end
1383
1384 defp collection_private(%{"first" => %{"type" => type}})
1385 when type in ["CollectionPage", "OrderedCollectionPage"],
1386 do: {:ok, false}
1387
1388 defp collection_private(%{"first" => first}) do
1389 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1390 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1391 {:ok, false}
1392 else
1393 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1394 {:error, _} = e -> e
1395 e -> {:error, e}
1396 end
1397 end
1398
1399 defp collection_private(_data), do: {:ok, true}
1400
1401 def user_data_from_user_object(data) do
1402 with {:ok, data} <- MRF.filter(data),
1403 {:ok, data} <- object_to_user_data(data) do
1404 {:ok, data}
1405 else
1406 e -> {:error, e}
1407 end
1408 end
1409
1410 def fetch_and_prepare_user_from_ap_id(ap_id) do
1411 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1412 {:ok, data} <- user_data_from_user_object(data),
1413 data <- maybe_update_follow_information(data) do
1414 {:ok, data}
1415 else
1416 {:error, "Object has been deleted"} = e ->
1417 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1418 {:error, e}
1419
1420 e ->
1421 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1422 {:error, e}
1423 end
1424 end
1425
1426 def make_user_from_ap_id(ap_id) do
1427 user = User.get_cached_by_ap_id(ap_id)
1428
1429 if user && !User.ap_enabled?(user) do
1430 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1431 else
1432 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1433 if user do
1434 user
1435 |> User.remote_user_changeset(data)
1436 |> User.update_and_set_cache()
1437 else
1438 data
1439 |> User.remote_user_changeset()
1440 |> Repo.insert()
1441 |> User.set_cache()
1442 end
1443 else
1444 e -> {:error, e}
1445 end
1446 end
1447 end
1448
1449 def make_user_from_nickname(nickname) do
1450 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1451 make_user_from_ap_id(ap_id)
1452 else
1453 _e -> {:error, "No AP id in WebFinger"}
1454 end
1455 end
1456
1457 # filter out broken threads
1458 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1459 entire_thread_visible_for_user?(activity, user)
1460 end
1461
1462 # do post-processing on a specific activity
1463 def contain_activity(%Activity{} = activity, %User{} = user) do
1464 contain_broken_threads(activity, user)
1465 end
1466
1467 def fetch_direct_messages_query do
1468 Activity
1469 |> restrict_type(%{"type" => "Create"})
1470 |> restrict_visibility(%{visibility: "direct"})
1471 |> order_by([activity], asc: activity.id)
1472 end
1473 end