Merge branch 'develop' into a1batross-develop-patch-62810
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
673 raise "Can't use the child object without preloading!"
674 end
675
676 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
677 from(
678 [_activity, object] in query,
679 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
680 )
681 end
682
683 defp restrict_tag_reject(query, _), do: query
684
685 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
686 raise "Can't use the child object without preloading!"
687 end
688
689 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
690 from(
691 [_activity, object] in query,
692 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
693 )
694 end
695
696 defp restrict_tag_all(query, _), do: query
697
698 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
699 raise "Can't use the child object without preloading!"
700 end
701
702 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
703 from(
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
706 )
707 end
708
709 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
710 from(
711 [_activity, object] in query,
712 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
713 )
714 end
715
716 defp restrict_tag(query, _), do: query
717
718 defp restrict_recipients(query, [], _user), do: query
719
720 defp restrict_recipients(query, recipients, nil) do
721 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
722 end
723
724 defp restrict_recipients(query, recipients, user) do
725 from(
726 activity in query,
727 where: fragment("? && ?", ^recipients, activity.recipients),
728 or_where: activity.actor == ^user.ap_id
729 )
730 end
731
732 defp restrict_local(query, %{local_only: true}) do
733 from(activity in query, where: activity.local == true)
734 end
735
736 defp restrict_local(query, _), do: query
737
738 defp restrict_remote(query, %{remote: true}) do
739 from(activity in query, where: activity.local == false)
740 end
741
742 defp restrict_remote(query, _), do: query
743
744 defp restrict_actor(query, %{actor_id: actor_id}) do
745 from(activity in query, where: activity.actor == ^actor_id)
746 end
747
748 defp restrict_actor(query, _), do: query
749
750 defp restrict_type(query, %{type: type}) when is_binary(type) do
751 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
752 end
753
754 defp restrict_type(query, %{type: type}) do
755 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
756 end
757
758 defp restrict_type(query, _), do: query
759
760 defp restrict_state(query, %{state: state}) do
761 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
762 end
763
764 defp restrict_state(query, _), do: query
765
766 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
767 from(
768 [_activity, object] in query,
769 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
770 )
771 end
772
773 defp restrict_favorited_by(query, _), do: query
774
775 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
776 raise "Can't use the child object without preloading!"
777 end
778
779 defp restrict_media(query, %{only_media: true}) do
780 from(
781 [activity, object] in query,
782 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
783 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
784 )
785 end
786
787 defp restrict_media(query, _), do: query
788
789 defp restrict_replies(query, %{exclude_replies: true}) do
790 from(
791 [_activity, object] in query,
792 where: fragment("?->>'inReplyTo' is null", object.data)
793 )
794 end
795
796 defp restrict_replies(query, %{
797 reply_filtering_user: %User{} = user,
798 reply_visibility: "self"
799 }) do
800 from(
801 [activity, object] in query,
802 where:
803 fragment(
804 "?->>'inReplyTo' is null OR ? = ANY(?)",
805 object.data,
806 ^user.ap_id,
807 activity.recipients
808 )
809 )
810 end
811
812 defp restrict_replies(query, %{
813 reply_filtering_user: %User{} = user,
814 reply_visibility: "following"
815 }) do
816 from(
817 [activity, object] in query,
818 where:
819 fragment(
820 """
821 ?->>'type' != 'Create' -- This isn't a Create
822 OR ?->>'inReplyTo' is null -- this isn't a reply
823 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
824 -- unless they are the author (because authors
825 -- are also part of the recipients). This leads
826 -- to a bug that self-replies by friends won't
827 -- show up.
828 OR ? = ? -- The actor is us
829 """,
830 activity.data,
831 object.data,
832 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
833 activity.recipients,
834 activity.actor,
835 activity.actor,
836 ^user.ap_id
837 )
838 )
839 end
840
841 defp restrict_replies(query, _), do: query
842
843 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
844 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
845 end
846
847 defp restrict_reblogs(query, _), do: query
848
849 defp restrict_muted(query, %{with_muted: true}), do: query
850
851 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
852 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
853
854 query =
855 from([activity] in query,
856 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
857 where:
858 fragment(
859 "not (?->'to' \\?| ?) or ? = ?",
860 activity.data,
861 ^mutes,
862 activity.actor,
863 ^user.ap_id
864 )
865 )
866
867 unless opts[:skip_preload] do
868 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
869 else
870 query
871 end
872 end
873
874 defp restrict_muted(query, _), do: query
875
876 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
877 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
878 domain_blocks = user.domain_blocks || []
879
880 following_ap_ids = User.get_friends_ap_ids(user)
881
882 query =
883 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
884
885 from(
886 [activity, object: o] in query,
887 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
888 where:
889 fragment(
890 "((not (? && ?)) or ? = ?)",
891 activity.recipients,
892 ^blocked_ap_ids,
893 activity.actor,
894 ^user.ap_id
895 ),
896 where:
897 fragment(
898 "recipients_contain_blocked_domains(?, ?) = false",
899 activity.recipients,
900 ^domain_blocks
901 ),
902 where:
903 fragment(
904 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
905 activity.data,
906 activity.data,
907 ^blocked_ap_ids
908 ),
909 where:
910 fragment(
911 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
912 activity.actor,
913 ^domain_blocks,
914 activity.actor,
915 ^following_ap_ids
916 ),
917 where:
918 fragment(
919 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
920 o.data,
921 ^domain_blocks,
922 o.data,
923 ^following_ap_ids
924 )
925 )
926 end
927
928 defp restrict_blocked(query, _), do: query
929
930 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
931 from(
932 activity in query,
933 where:
934 fragment(
935 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
936 activity.data,
937 ^[Constants.as_public()]
938 )
939 )
940 end
941
942 defp restrict_unlisted(query, _), do: query
943
944 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
945 from(activity in query, where: activity.id in ^ids)
946 end
947
948 defp restrict_pinned(query, _), do: query
949
950 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
951 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
952
953 from(
954 activity in query,
955 where:
956 fragment(
957 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
958 activity.data,
959 activity.actor,
960 ^muted_reblogs
961 )
962 )
963 end
964
965 defp restrict_muted_reblogs(query, _), do: query
966
967 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
968 from(
969 activity in query,
970 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
971 )
972 end
973
974 defp restrict_instance(query, _), do: query
975
976 defp restrict_filtered(query, %{user: %User{} = user}) do
977 case Filter.compose_regex(user) do
978 nil ->
979 query
980
981 regex ->
982 from([activity, object] in query,
983 where:
984 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
985 activity.actor == ^user.ap_id
986 )
987 end
988 end
989
990 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
991 restrict_filtered(query, %{user: user})
992 end
993
994 defp restrict_filtered(query, _), do: query
995
996 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
997
998 defp exclude_poll_votes(query, _) do
999 if has_named_binding?(query, :object) do
1000 from([activity, object: o] in query,
1001 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1002 )
1003 else
1004 query
1005 end
1006 end
1007
1008 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1009
1010 defp exclude_chat_messages(query, _) do
1011 if has_named_binding?(query, :object) do
1012 from([activity, object: o] in query,
1013 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1014 )
1015 else
1016 query
1017 end
1018 end
1019
1020 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1021
1022 defp exclude_invisible_actors(query, _opts) do
1023 invisible_ap_ids =
1024 User.Query.build(%{invisible: true, select: [:ap_id]})
1025 |> Repo.all()
1026 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1027
1028 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1029 end
1030
1031 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1032 from(activity in query, where: activity.id != ^id)
1033 end
1034
1035 defp exclude_id(query, _), do: query
1036
1037 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1038
1039 defp maybe_preload_objects(query, _) do
1040 query
1041 |> Activity.with_preloaded_object()
1042 end
1043
1044 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1045
1046 defp maybe_preload_bookmarks(query, opts) do
1047 query
1048 |> Activity.with_preloaded_bookmark(opts[:user])
1049 end
1050
1051 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1052 query
1053 |> Activity.with_preloaded_report_notes()
1054 end
1055
1056 defp maybe_preload_report_notes(query, _), do: query
1057
1058 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1059
1060 defp maybe_set_thread_muted_field(query, opts) do
1061 query
1062 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1063 end
1064
1065 defp maybe_order(query, %{order: :desc}) do
1066 query
1067 |> order_by(desc: :id)
1068 end
1069
1070 defp maybe_order(query, %{order: :asc}) do
1071 query
1072 |> order_by(asc: :id)
1073 end
1074
1075 defp maybe_order(query, _), do: query
1076
1077 defp fetch_activities_query_ap_ids_ops(opts) do
1078 source_user = opts[:muting_user]
1079 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1080
1081 ap_id_relationships =
1082 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1083 [:block | ap_id_relationships]
1084 else
1085 ap_id_relationships
1086 end
1087
1088 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1089
1090 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1091 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1092
1093 restrict_muted_reblogs_opts =
1094 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1095
1096 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1097 end
1098
1099 def fetch_activities_query(recipients, opts \\ %{}) do
1100 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1101 fetch_activities_query_ap_ids_ops(opts)
1102
1103 config = %{
1104 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1105 }
1106
1107 Activity
1108 |> maybe_preload_objects(opts)
1109 |> maybe_preload_bookmarks(opts)
1110 |> maybe_preload_report_notes(opts)
1111 |> maybe_set_thread_muted_field(opts)
1112 |> maybe_order(opts)
1113 |> restrict_recipients(recipients, opts[:user])
1114 |> restrict_replies(opts)
1115 |> restrict_tag(opts)
1116 |> restrict_tag_reject(opts)
1117 |> restrict_tag_all(opts)
1118 |> restrict_since(opts)
1119 |> restrict_local(opts)
1120 |> restrict_remote(opts)
1121 |> restrict_actor(opts)
1122 |> restrict_type(opts)
1123 |> restrict_state(opts)
1124 |> restrict_favorited_by(opts)
1125 |> restrict_blocked(restrict_blocked_opts)
1126 |> restrict_muted(restrict_muted_opts)
1127 |> restrict_filtered(opts)
1128 |> restrict_media(opts)
1129 |> restrict_visibility(opts)
1130 |> restrict_thread_visibility(opts, config)
1131 |> restrict_reblogs(opts)
1132 |> restrict_pinned(opts)
1133 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1134 |> restrict_instance(opts)
1135 |> restrict_announce_object_actor(opts)
1136 |> restrict_filtered(opts)
1137 |> Activity.restrict_deactivated_users()
1138 |> exclude_poll_votes(opts)
1139 |> exclude_chat_messages(opts)
1140 |> exclude_invisible_actors(opts)
1141 |> exclude_visibility(opts)
1142 end
1143
1144 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1145 list_memberships = Pleroma.List.memberships(opts[:user])
1146
1147 fetch_activities_query(recipients ++ list_memberships, opts)
1148 |> Pagination.fetch_paginated(opts, pagination)
1149 |> Enum.reverse()
1150 |> maybe_update_cc(list_memberships, opts[:user])
1151 end
1152
1153 @doc """
1154 Fetch favorites activities of user with order by sort adds to favorites
1155 """
1156 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1157 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1158 user.ap_id
1159 |> Activity.Queries.by_actor()
1160 |> Activity.Queries.by_type("Like")
1161 |> Activity.with_joined_object()
1162 |> Object.with_joined_activity()
1163 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1164 |> order_by([like, _, _], desc_nulls_last: like.id)
1165 |> Pagination.fetch_paginated(
1166 Map.merge(params, %{skip_order: true}),
1167 pagination
1168 )
1169 end
1170
1171 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1172 Enum.map(activities, fn
1173 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1174 if Enum.any?(bcc, &(&1 in list_memberships)) do
1175 update_in(activity.data["cc"], &[user_ap_id | &1])
1176 else
1177 activity
1178 end
1179
1180 activity ->
1181 activity
1182 end)
1183 end
1184
1185 defp maybe_update_cc(activities, _, _), do: activities
1186
1187 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1188 from(activity in query,
1189 where:
1190 fragment("? && ?", activity.recipients, ^recipients) or
1191 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1192 ^Constants.as_public() in activity.recipients)
1193 )
1194 end
1195
1196 def fetch_activities_bounded(
1197 recipients,
1198 recipients_with_public,
1199 opts \\ %{},
1200 pagination \\ :keyset
1201 ) do
1202 fetch_activities_query([], opts)
1203 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1204 |> Pagination.fetch_paginated(opts, pagination)
1205 |> Enum.reverse()
1206 end
1207
1208 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1209 def upload(file, opts \\ []) do
1210 with {:ok, data} <- Upload.store(file, opts) do
1211 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1212
1213 Repo.insert(%Object{data: obj_data})
1214 end
1215 end
1216
1217 @spec get_actor_url(any()) :: binary() | nil
1218 defp get_actor_url(url) when is_binary(url), do: url
1219 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1220
1221 defp get_actor_url(url) when is_list(url) do
1222 url
1223 |> List.first()
1224 |> get_actor_url()
1225 end
1226
1227 defp get_actor_url(_url), do: nil
1228
1229 defp object_to_user_data(data) do
1230 avatar =
1231 data["icon"]["url"] &&
1232 %{
1233 "type" => "Image",
1234 "url" => [%{"href" => data["icon"]["url"]}]
1235 }
1236
1237 banner =
1238 data["image"]["url"] &&
1239 %{
1240 "type" => "Image",
1241 "url" => [%{"href" => data["image"]["url"]}]
1242 }
1243
1244 fields =
1245 data
1246 |> Map.get("attachment", [])
1247 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1248 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1249
1250 emojis =
1251 data
1252 |> Map.get("tag", [])
1253 |> Enum.filter(fn
1254 %{"type" => "Emoji"} -> true
1255 _ -> false
1256 end)
1257 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1258 {String.trim(name, ":"), url}
1259 end)
1260
1261 is_locked = data["manuallyApprovesFollowers"] || false
1262 capabilities = data["capabilities"] || %{}
1263 accepts_chat_messages = capabilities["acceptsChatMessages"]
1264 data = Transmogrifier.maybe_fix_user_object(data)
1265 is_discoverable = data["discoverable"] || false
1266 invisible = data["invisible"] || false
1267 actor_type = data["type"] || "Person"
1268
1269 public_key =
1270 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1271 data["publicKey"]["publicKeyPem"]
1272 else
1273 nil
1274 end
1275
1276 shared_inbox =
1277 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1278 data["endpoints"]["sharedInbox"]
1279 else
1280 nil
1281 end
1282
1283 user_data = %{
1284 ap_id: data["id"],
1285 uri: get_actor_url(data["url"]),
1286 ap_enabled: true,
1287 banner: banner,
1288 fields: fields,
1289 emoji: emojis,
1290 is_locked: is_locked,
1291 is_discoverable: is_discoverable,
1292 invisible: invisible,
1293 avatar: avatar,
1294 name: data["name"],
1295 follower_address: data["followers"],
1296 following_address: data["following"],
1297 bio: data["summary"] || "",
1298 actor_type: actor_type,
1299 also_known_as: Map.get(data, "alsoKnownAs", []),
1300 public_key: public_key,
1301 inbox: data["inbox"],
1302 shared_inbox: shared_inbox,
1303 accepts_chat_messages: accepts_chat_messages
1304 }
1305
1306 # nickname can be nil because of virtual actors
1307 if data["preferredUsername"] do
1308 Map.put(
1309 user_data,
1310 :nickname,
1311 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1312 )
1313 else
1314 Map.put(user_data, :nickname, nil)
1315 end
1316 end
1317
1318 def fetch_follow_information_for_user(user) do
1319 with {:ok, following_data} <-
1320 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1321 {:ok, hide_follows} <- collection_private(following_data),
1322 {:ok, followers_data} <-
1323 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1324 {:ok, hide_followers} <- collection_private(followers_data) do
1325 {:ok,
1326 %{
1327 hide_follows: hide_follows,
1328 follower_count: normalize_counter(followers_data["totalItems"]),
1329 following_count: normalize_counter(following_data["totalItems"]),
1330 hide_followers: hide_followers
1331 }}
1332 else
1333 {:error, _} = e -> e
1334 e -> {:error, e}
1335 end
1336 end
1337
1338 defp normalize_counter(counter) when is_integer(counter), do: counter
1339 defp normalize_counter(_), do: 0
1340
1341 def maybe_update_follow_information(user_data) do
1342 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1343 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1344 {_, true} <-
1345 {:collections_available,
1346 !!(user_data[:following_address] && user_data[:follower_address])},
1347 {:ok, info} <-
1348 fetch_follow_information_for_user(user_data) do
1349 info = Map.merge(user_data[:info] || %{}, info)
1350
1351 user_data
1352 |> Map.put(:info, info)
1353 else
1354 {:user_type_check, false} ->
1355 user_data
1356
1357 {:collections_available, false} ->
1358 user_data
1359
1360 {:enabled, false} ->
1361 user_data
1362
1363 e ->
1364 Logger.error(
1365 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1366 )
1367
1368 user_data
1369 end
1370 end
1371
1372 defp collection_private(%{"first" => %{"type" => type}})
1373 when type in ["CollectionPage", "OrderedCollectionPage"],
1374 do: {:ok, false}
1375
1376 defp collection_private(%{"first" => first}) do
1377 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1378 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1379 {:ok, false}
1380 else
1381 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1382 {:error, _} = e -> e
1383 e -> {:error, e}
1384 end
1385 end
1386
1387 defp collection_private(_data), do: {:ok, true}
1388
1389 def user_data_from_user_object(data) do
1390 with {:ok, data} <- MRF.filter(data) do
1391 {:ok, object_to_user_data(data)}
1392 else
1393 e -> {:error, e}
1394 end
1395 end
1396
1397 def fetch_and_prepare_user_from_ap_id(ap_id) do
1398 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1399 {:ok, data} <- user_data_from_user_object(data) do
1400 {:ok, maybe_update_follow_information(data)}
1401 else
1402 # If this has been deleted, only log a debug and not an error
1403 {:error, "Object has been deleted" = e} ->
1404 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1405 {:error, e}
1406
1407 {:error, {:reject, reason} = e} ->
1408 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1409 {:error, e}
1410
1411 {:error, e} ->
1412 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1413 {:error, e}
1414 end
1415 end
1416
1417 def maybe_handle_clashing_nickname(data) do
1418 with nickname when is_binary(nickname) <- data[:nickname],
1419 %User{} = old_user <- User.get_by_nickname(nickname),
1420 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1421 Logger.info(
1422 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1423 data[:ap_id]
1424 }, renaming."
1425 )
1426
1427 old_user
1428 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1429 |> User.update_and_set_cache()
1430 else
1431 {:ap_id_comparison, true} ->
1432 Logger.info(
1433 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1434 )
1435
1436 _ ->
1437 nil
1438 end
1439 end
1440
1441 def make_user_from_ap_id(ap_id) do
1442 user = User.get_cached_by_ap_id(ap_id)
1443
1444 if user && !User.ap_enabled?(user) do
1445 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1446 else
1447 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1448 if user do
1449 user
1450 |> User.remote_user_changeset(data)
1451 |> User.update_and_set_cache()
1452 else
1453 maybe_handle_clashing_nickname(data)
1454
1455 data
1456 |> User.remote_user_changeset()
1457 |> Repo.insert()
1458 |> User.set_cache()
1459 end
1460 end
1461 end
1462 end
1463
1464 def make_user_from_nickname(nickname) do
1465 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1466 make_user_from_ap_id(ap_id)
1467 else
1468 _e -> {:error, "No AP id in WebFinger"}
1469 end
1470 end
1471
1472 # filter out broken threads
1473 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1474 entire_thread_visible_for_user?(activity, user)
1475 end
1476
1477 # do post-processing on a specific activity
1478 def contain_activity(%Activity{} = activity, %User{} = user) do
1479 contain_broken_threads(activity, user)
1480 end
1481
1482 def fetch_direct_messages_query do
1483 Activity
1484 |> restrict_type(%{type: "Create"})
1485 |> restrict_visibility(%{visibility: "direct"})
1486 |> order_by([activity], asc: activity.id)
1487 end
1488 end