Merge branch 'develop' into refactor/deactivated_user_field
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
673 raise "Can't use the child object without preloading!"
674 end
675
676 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
677 from(
678 [_activity, object] in query,
679 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
680 )
681 end
682
683 defp restrict_tag_reject(query, _), do: query
684
685 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
686 raise "Can't use the child object without preloading!"
687 end
688
689 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
690 from(
691 [_activity, object] in query,
692 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
693 )
694 end
695
696 defp restrict_tag_all(query, _), do: query
697
698 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
699 raise "Can't use the child object without preloading!"
700 end
701
702 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
703 from(
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
706 )
707 end
708
709 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
710 from(
711 [_activity, object] in query,
712 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
713 )
714 end
715
716 defp restrict_tag(query, _), do: query
717
718 defp restrict_recipients(query, [], _user), do: query
719
720 defp restrict_recipients(query, recipients, nil) do
721 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
722 end
723
724 defp restrict_recipients(query, recipients, user) do
725 from(
726 activity in query,
727 where: fragment("? && ?", ^recipients, activity.recipients),
728 or_where: activity.actor == ^user.ap_id
729 )
730 end
731
732 defp restrict_local(query, %{local_only: true}) do
733 from(activity in query, where: activity.local == true)
734 end
735
736 defp restrict_local(query, _), do: query
737
738 defp restrict_actor(query, %{actor_id: actor_id}) do
739 from(activity in query, where: activity.actor == ^actor_id)
740 end
741
742 defp restrict_actor(query, _), do: query
743
744 defp restrict_type(query, %{type: type}) when is_binary(type) do
745 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
746 end
747
748 defp restrict_type(query, %{type: type}) do
749 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
750 end
751
752 defp restrict_type(query, _), do: query
753
754 defp restrict_state(query, %{state: state}) do
755 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
756 end
757
758 defp restrict_state(query, _), do: query
759
760 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
761 from(
762 [_activity, object] in query,
763 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
764 )
765 end
766
767 defp restrict_favorited_by(query, _), do: query
768
769 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
770 raise "Can't use the child object without preloading!"
771 end
772
773 defp restrict_media(query, %{only_media: true}) do
774 from(
775 [activity, object] in query,
776 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
777 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
778 )
779 end
780
781 defp restrict_media(query, _), do: query
782
783 defp restrict_replies(query, %{exclude_replies: true}) do
784 from(
785 [_activity, object] in query,
786 where: fragment("?->>'inReplyTo' is null", object.data)
787 )
788 end
789
790 defp restrict_replies(query, %{
791 reply_filtering_user: %User{} = user,
792 reply_visibility: "self"
793 }) do
794 from(
795 [activity, object] in query,
796 where:
797 fragment(
798 "?->>'inReplyTo' is null OR ? = ANY(?)",
799 object.data,
800 ^user.ap_id,
801 activity.recipients
802 )
803 )
804 end
805
806 defp restrict_replies(query, %{
807 reply_filtering_user: %User{} = user,
808 reply_visibility: "following"
809 }) do
810 from(
811 [activity, object] in query,
812 where:
813 fragment(
814 """
815 ?->>'type' != 'Create' -- This isn't a Create
816 OR ?->>'inReplyTo' is null -- this isn't a reply
817 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
818 -- unless they are the author (because authors
819 -- are also part of the recipients). This leads
820 -- to a bug that self-replies by friends won't
821 -- show up.
822 OR ? = ? -- The actor is us
823 """,
824 activity.data,
825 object.data,
826 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
827 activity.recipients,
828 activity.actor,
829 activity.actor,
830 ^user.ap_id
831 )
832 )
833 end
834
835 defp restrict_replies(query, _), do: query
836
837 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
838 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
839 end
840
841 defp restrict_reblogs(query, _), do: query
842
843 defp restrict_muted(query, %{with_muted: true}), do: query
844
845 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
846 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
847
848 query =
849 from([activity] in query,
850 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
851 where:
852 fragment(
853 "not (?->'to' \\?| ?) or ? = ?",
854 activity.data,
855 ^mutes,
856 activity.actor,
857 ^user.ap_id
858 )
859 )
860
861 unless opts[:skip_preload] do
862 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
863 else
864 query
865 end
866 end
867
868 defp restrict_muted(query, _), do: query
869
870 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
871 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
872 domain_blocks = user.domain_blocks || []
873
874 following_ap_ids = User.get_friends_ap_ids(user)
875
876 query =
877 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
878
879 from(
880 [activity, object: o] in query,
881 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
882 where:
883 fragment(
884 "((not (? && ?)) or ? = ?)",
885 activity.recipients,
886 ^blocked_ap_ids,
887 activity.actor,
888 ^user.ap_id
889 ),
890 where:
891 fragment(
892 "recipients_contain_blocked_domains(?, ?) = false",
893 activity.recipients,
894 ^domain_blocks
895 ),
896 where:
897 fragment(
898 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
899 activity.data,
900 activity.data,
901 ^blocked_ap_ids
902 ),
903 where:
904 fragment(
905 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
906 activity.actor,
907 ^domain_blocks,
908 activity.actor,
909 ^following_ap_ids
910 ),
911 where:
912 fragment(
913 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
914 o.data,
915 ^domain_blocks,
916 o.data,
917 ^following_ap_ids
918 )
919 )
920 end
921
922 defp restrict_blocked(query, _), do: query
923
924 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
925 from(
926 activity in query,
927 where:
928 fragment(
929 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
930 activity.data,
931 ^[Constants.as_public()]
932 )
933 )
934 end
935
936 defp restrict_unlisted(query, _), do: query
937
938 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
939 from(activity in query, where: activity.id in ^ids)
940 end
941
942 defp restrict_pinned(query, _), do: query
943
944 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
945 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
946
947 from(
948 activity in query,
949 where:
950 fragment(
951 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
952 activity.data,
953 activity.actor,
954 ^muted_reblogs
955 )
956 )
957 end
958
959 defp restrict_muted_reblogs(query, _), do: query
960
961 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
962 from(
963 activity in query,
964 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
965 )
966 end
967
968 defp restrict_instance(query, _), do: query
969
970 defp restrict_filtered(query, %{user: %User{} = user}) do
971 case Filter.compose_regex(user) do
972 nil ->
973 query
974
975 regex ->
976 from([activity, object] in query,
977 where:
978 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
979 activity.actor == ^user.ap_id
980 )
981 end
982 end
983
984 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
985 restrict_filtered(query, %{user: user})
986 end
987
988 defp restrict_filtered(query, _), do: query
989
990 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
991
992 defp exclude_poll_votes(query, _) do
993 if has_named_binding?(query, :object) do
994 from([activity, object: o] in query,
995 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
996 )
997 else
998 query
999 end
1000 end
1001
1002 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1003
1004 defp exclude_chat_messages(query, _) do
1005 if has_named_binding?(query, :object) do
1006 from([activity, object: o] in query,
1007 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1008 )
1009 else
1010 query
1011 end
1012 end
1013
1014 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1015
1016 defp exclude_invisible_actors(query, _opts) do
1017 invisible_ap_ids =
1018 User.Query.build(%{invisible: true, select: [:ap_id]})
1019 |> Repo.all()
1020 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1021
1022 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1023 end
1024
1025 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1026 from(activity in query, where: activity.id != ^id)
1027 end
1028
1029 defp exclude_id(query, _), do: query
1030
1031 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1032
1033 defp maybe_preload_objects(query, _) do
1034 query
1035 |> Activity.with_preloaded_object()
1036 end
1037
1038 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1039
1040 defp maybe_preload_bookmarks(query, opts) do
1041 query
1042 |> Activity.with_preloaded_bookmark(opts[:user])
1043 end
1044
1045 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1046 query
1047 |> Activity.with_preloaded_report_notes()
1048 end
1049
1050 defp maybe_preload_report_notes(query, _), do: query
1051
1052 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1053
1054 defp maybe_set_thread_muted_field(query, opts) do
1055 query
1056 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1057 end
1058
1059 defp maybe_order(query, %{order: :desc}) do
1060 query
1061 |> order_by(desc: :id)
1062 end
1063
1064 defp maybe_order(query, %{order: :asc}) do
1065 query
1066 |> order_by(asc: :id)
1067 end
1068
1069 defp maybe_order(query, _), do: query
1070
1071 defp fetch_activities_query_ap_ids_ops(opts) do
1072 source_user = opts[:muting_user]
1073 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1074
1075 ap_id_relationships =
1076 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1077 [:block | ap_id_relationships]
1078 else
1079 ap_id_relationships
1080 end
1081
1082 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1083
1084 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1085 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1086
1087 restrict_muted_reblogs_opts =
1088 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1089
1090 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1091 end
1092
1093 def fetch_activities_query(recipients, opts \\ %{}) do
1094 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1095 fetch_activities_query_ap_ids_ops(opts)
1096
1097 config = %{
1098 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1099 }
1100
1101 Activity
1102 |> maybe_preload_objects(opts)
1103 |> maybe_preload_bookmarks(opts)
1104 |> maybe_preload_report_notes(opts)
1105 |> maybe_set_thread_muted_field(opts)
1106 |> maybe_order(opts)
1107 |> restrict_recipients(recipients, opts[:user])
1108 |> restrict_replies(opts)
1109 |> restrict_tag(opts)
1110 |> restrict_tag_reject(opts)
1111 |> restrict_tag_all(opts)
1112 |> restrict_since(opts)
1113 |> restrict_local(opts)
1114 |> restrict_actor(opts)
1115 |> restrict_type(opts)
1116 |> restrict_state(opts)
1117 |> restrict_favorited_by(opts)
1118 |> restrict_blocked(restrict_blocked_opts)
1119 |> restrict_muted(restrict_muted_opts)
1120 |> restrict_filtered(opts)
1121 |> restrict_media(opts)
1122 |> restrict_visibility(opts)
1123 |> restrict_thread_visibility(opts, config)
1124 |> restrict_reblogs(opts)
1125 |> restrict_pinned(opts)
1126 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1127 |> restrict_instance(opts)
1128 |> restrict_announce_object_actor(opts)
1129 |> restrict_filtered(opts)
1130 |> Activity.restrict_deactivated_users()
1131 |> exclude_poll_votes(opts)
1132 |> exclude_chat_messages(opts)
1133 |> exclude_invisible_actors(opts)
1134 |> exclude_visibility(opts)
1135 end
1136
1137 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1138 list_memberships = Pleroma.List.memberships(opts[:user])
1139
1140 fetch_activities_query(recipients ++ list_memberships, opts)
1141 |> Pagination.fetch_paginated(opts, pagination)
1142 |> Enum.reverse()
1143 |> maybe_update_cc(list_memberships, opts[:user])
1144 end
1145
1146 @doc """
1147 Fetch favorites activities of user with order by sort adds to favorites
1148 """
1149 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1150 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1151 user.ap_id
1152 |> Activity.Queries.by_actor()
1153 |> Activity.Queries.by_type("Like")
1154 |> Activity.with_joined_object()
1155 |> Object.with_joined_activity()
1156 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1157 |> order_by([like, _, _], desc_nulls_last: like.id)
1158 |> Pagination.fetch_paginated(
1159 Map.merge(params, %{skip_order: true}),
1160 pagination
1161 )
1162 end
1163
1164 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1165 Enum.map(activities, fn
1166 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1167 if Enum.any?(bcc, &(&1 in list_memberships)) do
1168 update_in(activity.data["cc"], &[user_ap_id | &1])
1169 else
1170 activity
1171 end
1172
1173 activity ->
1174 activity
1175 end)
1176 end
1177
1178 defp maybe_update_cc(activities, _, _), do: activities
1179
1180 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1181 from(activity in query,
1182 where:
1183 fragment("? && ?", activity.recipients, ^recipients) or
1184 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1185 ^Constants.as_public() in activity.recipients)
1186 )
1187 end
1188
1189 def fetch_activities_bounded(
1190 recipients,
1191 recipients_with_public,
1192 opts \\ %{},
1193 pagination \\ :keyset
1194 ) do
1195 fetch_activities_query([], opts)
1196 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1197 |> Pagination.fetch_paginated(opts, pagination)
1198 |> Enum.reverse()
1199 end
1200
1201 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1202 def upload(file, opts \\ []) do
1203 with {:ok, data} <- Upload.store(file, opts) do
1204 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1205
1206 Repo.insert(%Object{data: obj_data})
1207 end
1208 end
1209
1210 @spec get_actor_url(any()) :: binary() | nil
1211 defp get_actor_url(url) when is_binary(url), do: url
1212 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1213
1214 defp get_actor_url(url) when is_list(url) do
1215 url
1216 |> List.first()
1217 |> get_actor_url()
1218 end
1219
1220 defp get_actor_url(_url), do: nil
1221
1222 defp object_to_user_data(data) do
1223 avatar =
1224 data["icon"]["url"] &&
1225 %{
1226 "type" => "Image",
1227 "url" => [%{"href" => data["icon"]["url"]}]
1228 }
1229
1230 banner =
1231 data["image"]["url"] &&
1232 %{
1233 "type" => "Image",
1234 "url" => [%{"href" => data["image"]["url"]}]
1235 }
1236
1237 fields =
1238 data
1239 |> Map.get("attachment", [])
1240 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1241 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1242
1243 emojis =
1244 data
1245 |> Map.get("tag", [])
1246 |> Enum.filter(fn
1247 %{"type" => "Emoji"} -> true
1248 _ -> false
1249 end)
1250 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1251 {String.trim(name, ":"), url}
1252 end)
1253
1254 is_locked = data["manuallyApprovesFollowers"] || false
1255 capabilities = data["capabilities"] || %{}
1256 accepts_chat_messages = capabilities["acceptsChatMessages"]
1257 data = Transmogrifier.maybe_fix_user_object(data)
1258 is_discoverable = data["discoverable"] || false
1259 invisible = data["invisible"] || false
1260 actor_type = data["type"] || "Person"
1261
1262 public_key =
1263 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1264 data["publicKey"]["publicKeyPem"]
1265 else
1266 nil
1267 end
1268
1269 shared_inbox =
1270 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1271 data["endpoints"]["sharedInbox"]
1272 else
1273 nil
1274 end
1275
1276 user_data = %{
1277 ap_id: data["id"],
1278 uri: get_actor_url(data["url"]),
1279 ap_enabled: true,
1280 banner: banner,
1281 fields: fields,
1282 emoji: emojis,
1283 is_locked: is_locked,
1284 is_discoverable: is_discoverable,
1285 invisible: invisible,
1286 avatar: avatar,
1287 name: data["name"],
1288 follower_address: data["followers"],
1289 following_address: data["following"],
1290 bio: data["summary"] || "",
1291 actor_type: actor_type,
1292 also_known_as: Map.get(data, "alsoKnownAs", []),
1293 public_key: public_key,
1294 inbox: data["inbox"],
1295 shared_inbox: shared_inbox,
1296 accepts_chat_messages: accepts_chat_messages
1297 }
1298
1299 # nickname can be nil because of virtual actors
1300 if data["preferredUsername"] do
1301 Map.put(
1302 user_data,
1303 :nickname,
1304 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1305 )
1306 else
1307 Map.put(user_data, :nickname, nil)
1308 end
1309 end
1310
1311 def fetch_follow_information_for_user(user) do
1312 with {:ok, following_data} <-
1313 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1314 {:ok, hide_follows} <- collection_private(following_data),
1315 {:ok, followers_data} <-
1316 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1317 {:ok, hide_followers} <- collection_private(followers_data) do
1318 {:ok,
1319 %{
1320 hide_follows: hide_follows,
1321 follower_count: normalize_counter(followers_data["totalItems"]),
1322 following_count: normalize_counter(following_data["totalItems"]),
1323 hide_followers: hide_followers
1324 }}
1325 else
1326 {:error, _} = e -> e
1327 e -> {:error, e}
1328 end
1329 end
1330
1331 defp normalize_counter(counter) when is_integer(counter), do: counter
1332 defp normalize_counter(_), do: 0
1333
1334 def maybe_update_follow_information(user_data) do
1335 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1336 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1337 {_, true} <-
1338 {:collections_available,
1339 !!(user_data[:following_address] && user_data[:follower_address])},
1340 {:ok, info} <-
1341 fetch_follow_information_for_user(user_data) do
1342 info = Map.merge(user_data[:info] || %{}, info)
1343
1344 user_data
1345 |> Map.put(:info, info)
1346 else
1347 {:user_type_check, false} ->
1348 user_data
1349
1350 {:collections_available, false} ->
1351 user_data
1352
1353 {:enabled, false} ->
1354 user_data
1355
1356 e ->
1357 Logger.error(
1358 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1359 )
1360
1361 user_data
1362 end
1363 end
1364
1365 defp collection_private(%{"first" => %{"type" => type}})
1366 when type in ["CollectionPage", "OrderedCollectionPage"],
1367 do: {:ok, false}
1368
1369 defp collection_private(%{"first" => first}) do
1370 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1371 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1372 {:ok, false}
1373 else
1374 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1375 {:error, _} = e -> e
1376 e -> {:error, e}
1377 end
1378 end
1379
1380 defp collection_private(_data), do: {:ok, true}
1381
1382 def user_data_from_user_object(data) do
1383 with {:ok, data} <- MRF.filter(data) do
1384 {:ok, object_to_user_data(data)}
1385 else
1386 e -> {:error, e}
1387 end
1388 end
1389
1390 def fetch_and_prepare_user_from_ap_id(ap_id) do
1391 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1392 {:ok, data} <- user_data_from_user_object(data) do
1393 {:ok, maybe_update_follow_information(data)}
1394 else
1395 # If this has been deleted, only log a debug and not an error
1396 {:error, "Object has been deleted" = e} ->
1397 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1398 {:error, e}
1399
1400 {:error, {:reject, reason} = e} ->
1401 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1402 {:error, e}
1403
1404 {:error, e} ->
1405 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1406 {:error, e}
1407 end
1408 end
1409
1410 def maybe_handle_clashing_nickname(data) do
1411 with nickname when is_binary(nickname) <- data[:nickname],
1412 %User{} = old_user <- User.get_by_nickname(nickname),
1413 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1414 Logger.info(
1415 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1416 data[:ap_id]
1417 }, renaming."
1418 )
1419
1420 old_user
1421 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1422 |> User.update_and_set_cache()
1423 else
1424 {:ap_id_comparison, true} ->
1425 Logger.info(
1426 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1427 )
1428
1429 _ ->
1430 nil
1431 end
1432 end
1433
1434 def make_user_from_ap_id(ap_id) do
1435 user = User.get_cached_by_ap_id(ap_id)
1436
1437 if user && !User.ap_enabled?(user) do
1438 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1439 else
1440 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1441 if user do
1442 user
1443 |> User.remote_user_changeset(data)
1444 |> User.update_and_set_cache()
1445 else
1446 maybe_handle_clashing_nickname(data)
1447
1448 data
1449 |> User.remote_user_changeset()
1450 |> Repo.insert()
1451 |> User.set_cache()
1452 end
1453 end
1454 end
1455 end
1456
1457 def make_user_from_nickname(nickname) do
1458 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1459 make_user_from_ap_id(ap_id)
1460 else
1461 _e -> {:error, "No AP id in WebFinger"}
1462 end
1463 end
1464
1465 # filter out broken threads
1466 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1467 entire_thread_visible_for_user?(activity, user)
1468 end
1469
1470 # do post-processing on a specific activity
1471 def contain_activity(%Activity{} = activity, %User{} = user) do
1472 contain_broken_threads(activity, user)
1473 end
1474
1475 def fetch_direct_messages_query do
1476 Activity
1477 |> restrict_type(%{type: "Create"})
1478 |> restrict_visibility(%{visibility: "direct"})
1479 |> order_by([activity], asc: activity.id)
1480 end
1481 end