Merge branch '2391-async-bugs' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{deactivated: deactivated} -> not deactivated
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type =
612 cond do
613 !Map.has_key?(params, :offset) -> :keyset
614 true -> :offset
615 end
616
617 %{
618 godmode: params[:godmode],
619 reading_user: reading_user
620 }
621 |> user_activities_recipients()
622 |> fetch_activities(params, pagination_type)
623 |> Enum.reverse()
624 end
625
626 def fetch_statuses(reading_user, params) do
627 params = Map.put(params, :type, ["Create", "Announce"])
628
629 %{
630 godmode: params[:godmode],
631 reading_user: reading_user
632 }
633 |> user_activities_recipients()
634 |> fetch_activities(params, :offset)
635 |> Enum.reverse()
636 end
637
638 defp user_activities_recipients(%{godmode: true}), do: []
639
640 defp user_activities_recipients(%{reading_user: reading_user}) do
641 if reading_user do
642 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
643 else
644 [Constants.as_public()]
645 end
646 end
647
648 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
649 raise "Can't use the child object without preloading!"
650 end
651
652 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
653 from(
654 [activity, object] in query,
655 where:
656 fragment(
657 "?->>'type' != ? or ?->>'actor' != ?",
658 activity.data,
659 "Announce",
660 object.data,
661 ^actor
662 )
663 )
664 end
665
666 defp restrict_announce_object_actor(query, _), do: query
667
668 defp restrict_since(query, %{since_id: ""}), do: query
669
670 defp restrict_since(query, %{since_id: since_id}) do
671 from(activity in query, where: activity.id > ^since_id)
672 end
673
674 defp restrict_since(query, _), do: query
675
676 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
677 raise "Can't use the child object without preloading!"
678 end
679
680 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
681 from(
682 [_activity, object] in query,
683 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
684 )
685 end
686
687 defp restrict_tag_reject(query, _), do: query
688
689 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
690 raise "Can't use the child object without preloading!"
691 end
692
693 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
694 from(
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
697 )
698 end
699
700 defp restrict_tag_all(query, _), do: query
701
702 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
703 raise "Can't use the child object without preloading!"
704 end
705
706 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
707 from(
708 [_activity, object] in query,
709 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
710 )
711 end
712
713 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
714 from(
715 [_activity, object] in query,
716 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
717 )
718 end
719
720 defp restrict_tag(query, _), do: query
721
722 defp restrict_recipients(query, [], _user), do: query
723
724 defp restrict_recipients(query, recipients, nil) do
725 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
726 end
727
728 defp restrict_recipients(query, recipients, user) do
729 from(
730 activity in query,
731 where: fragment("? && ?", ^recipients, activity.recipients),
732 or_where: activity.actor == ^user.ap_id
733 )
734 end
735
736 defp restrict_local(query, %{local_only: true}) do
737 from(activity in query, where: activity.local == true)
738 end
739
740 defp restrict_local(query, _), do: query
741
742 defp restrict_actor(query, %{actor_id: actor_id}) do
743 from(activity in query, where: activity.actor == ^actor_id)
744 end
745
746 defp restrict_actor(query, _), do: query
747
748 defp restrict_type(query, %{type: type}) when is_binary(type) do
749 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
750 end
751
752 defp restrict_type(query, %{type: type}) do
753 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
754 end
755
756 defp restrict_type(query, _), do: query
757
758 defp restrict_state(query, %{state: state}) do
759 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
760 end
761
762 defp restrict_state(query, _), do: query
763
764 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
765 from(
766 [_activity, object] in query,
767 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
768 )
769 end
770
771 defp restrict_favorited_by(query, _), do: query
772
773 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
774 raise "Can't use the child object without preloading!"
775 end
776
777 defp restrict_media(query, %{only_media: true}) do
778 from(
779 [activity, object] in query,
780 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
781 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
782 )
783 end
784
785 defp restrict_media(query, _), do: query
786
787 defp restrict_replies(query, %{exclude_replies: true}) do
788 from(
789 [_activity, object] in query,
790 where: fragment("?->>'inReplyTo' is null", object.data)
791 )
792 end
793
794 defp restrict_replies(query, %{
795 reply_filtering_user: %User{} = user,
796 reply_visibility: "self"
797 }) do
798 from(
799 [activity, object] in query,
800 where:
801 fragment(
802 "?->>'inReplyTo' is null OR ? = ANY(?)",
803 object.data,
804 ^user.ap_id,
805 activity.recipients
806 )
807 )
808 end
809
810 defp restrict_replies(query, %{
811 reply_filtering_user: %User{} = user,
812 reply_visibility: "following"
813 }) do
814 from(
815 [activity, object] in query,
816 where:
817 fragment(
818 """
819 ?->>'type' != 'Create' -- This isn't a Create
820 OR ?->>'inReplyTo' is null -- this isn't a reply
821 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
822 -- unless they are the author (because authors
823 -- are also part of the recipients). This leads
824 -- to a bug that self-replies by friends won't
825 -- show up.
826 OR ? = ? -- The actor is us
827 """,
828 activity.data,
829 object.data,
830 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
831 activity.recipients,
832 activity.actor,
833 activity.actor,
834 ^user.ap_id
835 )
836 )
837 end
838
839 defp restrict_replies(query, _), do: query
840
841 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
842 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
843 end
844
845 defp restrict_reblogs(query, _), do: query
846
847 defp restrict_muted(query, %{with_muted: true}), do: query
848
849 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
850 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
851
852 query =
853 from([activity] in query,
854 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
855 where:
856 fragment(
857 "not (?->'to' \\?| ?) or ? = ?",
858 activity.data,
859 ^mutes,
860 activity.actor,
861 ^user.ap_id
862 )
863 )
864
865 unless opts[:skip_preload] do
866 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
867 else
868 query
869 end
870 end
871
872 defp restrict_muted(query, _), do: query
873
874 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
875 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
876 domain_blocks = user.domain_blocks || []
877
878 following_ap_ids = User.get_friends_ap_ids(user)
879
880 query =
881 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
882
883 from(
884 [activity, object: o] in query,
885 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
886 where:
887 fragment(
888 "((not (? && ?)) or ? = ?)",
889 activity.recipients,
890 ^blocked_ap_ids,
891 activity.actor,
892 ^user.ap_id
893 ),
894 where:
895 fragment(
896 "recipients_contain_blocked_domains(?, ?) = false",
897 activity.recipients,
898 ^domain_blocks
899 ),
900 where:
901 fragment(
902 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
903 activity.data,
904 activity.data,
905 ^blocked_ap_ids
906 ),
907 where:
908 fragment(
909 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
910 activity.actor,
911 ^domain_blocks,
912 activity.actor,
913 ^following_ap_ids
914 ),
915 where:
916 fragment(
917 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
918 o.data,
919 ^domain_blocks,
920 o.data,
921 ^following_ap_ids
922 )
923 )
924 end
925
926 defp restrict_blocked(query, _), do: query
927
928 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
929 from(
930 activity in query,
931 where:
932 fragment(
933 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
934 activity.data,
935 ^[Constants.as_public()]
936 )
937 )
938 end
939
940 defp restrict_unlisted(query, _), do: query
941
942 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
943 from(activity in query, where: activity.id in ^ids)
944 end
945
946 defp restrict_pinned(query, _), do: query
947
948 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
949 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
950
951 from(
952 activity in query,
953 where:
954 fragment(
955 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
956 activity.data,
957 activity.actor,
958 ^muted_reblogs
959 )
960 )
961 end
962
963 defp restrict_muted_reblogs(query, _), do: query
964
965 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
966 from(
967 activity in query,
968 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
969 )
970 end
971
972 defp restrict_instance(query, _), do: query
973
974 defp restrict_filtered(query, %{user: %User{} = user}) do
975 case Filter.compose_regex(user) do
976 nil ->
977 query
978
979 regex ->
980 from([activity, object] in query,
981 where:
982 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
983 activity.actor == ^user.ap_id
984 )
985 end
986 end
987
988 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
989 restrict_filtered(query, %{user: user})
990 end
991
992 defp restrict_filtered(query, _), do: query
993
994 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
995
996 defp exclude_poll_votes(query, _) do
997 if has_named_binding?(query, :object) do
998 from([activity, object: o] in query,
999 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1000 )
1001 else
1002 query
1003 end
1004 end
1005
1006 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1007
1008 defp exclude_chat_messages(query, _) do
1009 if has_named_binding?(query, :object) do
1010 from([activity, object: o] in query,
1011 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1012 )
1013 else
1014 query
1015 end
1016 end
1017
1018 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1019
1020 defp exclude_invisible_actors(query, _opts) do
1021 invisible_ap_ids =
1022 User.Query.build(%{invisible: true, select: [:ap_id]})
1023 |> Repo.all()
1024 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1025
1026 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1027 end
1028
1029 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1030 from(activity in query, where: activity.id != ^id)
1031 end
1032
1033 defp exclude_id(query, _), do: query
1034
1035 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1036
1037 defp maybe_preload_objects(query, _) do
1038 query
1039 |> Activity.with_preloaded_object()
1040 end
1041
1042 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1043
1044 defp maybe_preload_bookmarks(query, opts) do
1045 query
1046 |> Activity.with_preloaded_bookmark(opts[:user])
1047 end
1048
1049 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1050 query
1051 |> Activity.with_preloaded_report_notes()
1052 end
1053
1054 defp maybe_preload_report_notes(query, _), do: query
1055
1056 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1057
1058 defp maybe_set_thread_muted_field(query, opts) do
1059 query
1060 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1061 end
1062
1063 defp maybe_order(query, %{order: :desc}) do
1064 query
1065 |> order_by(desc: :id)
1066 end
1067
1068 defp maybe_order(query, %{order: :asc}) do
1069 query
1070 |> order_by(asc: :id)
1071 end
1072
1073 defp maybe_order(query, _), do: query
1074
1075 defp fetch_activities_query_ap_ids_ops(opts) do
1076 source_user = opts[:muting_user]
1077 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1078
1079 ap_id_relationships =
1080 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1081 [:block | ap_id_relationships]
1082 else
1083 ap_id_relationships
1084 end
1085
1086 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1087
1088 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1089 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1090
1091 restrict_muted_reblogs_opts =
1092 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1093
1094 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1095 end
1096
1097 def fetch_activities_query(recipients, opts \\ %{}) do
1098 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1099 fetch_activities_query_ap_ids_ops(opts)
1100
1101 config = %{
1102 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1103 }
1104
1105 Activity
1106 |> maybe_preload_objects(opts)
1107 |> maybe_preload_bookmarks(opts)
1108 |> maybe_preload_report_notes(opts)
1109 |> maybe_set_thread_muted_field(opts)
1110 |> maybe_order(opts)
1111 |> restrict_recipients(recipients, opts[:user])
1112 |> restrict_replies(opts)
1113 |> restrict_tag(opts)
1114 |> restrict_tag_reject(opts)
1115 |> restrict_tag_all(opts)
1116 |> restrict_since(opts)
1117 |> restrict_local(opts)
1118 |> restrict_actor(opts)
1119 |> restrict_type(opts)
1120 |> restrict_state(opts)
1121 |> restrict_favorited_by(opts)
1122 |> restrict_blocked(restrict_blocked_opts)
1123 |> restrict_muted(restrict_muted_opts)
1124 |> restrict_filtered(opts)
1125 |> restrict_media(opts)
1126 |> restrict_visibility(opts)
1127 |> restrict_thread_visibility(opts, config)
1128 |> restrict_reblogs(opts)
1129 |> restrict_pinned(opts)
1130 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1131 |> restrict_instance(opts)
1132 |> restrict_announce_object_actor(opts)
1133 |> restrict_filtered(opts)
1134 |> Activity.restrict_deactivated_users()
1135 |> exclude_poll_votes(opts)
1136 |> exclude_chat_messages(opts)
1137 |> exclude_invisible_actors(opts)
1138 |> exclude_visibility(opts)
1139 end
1140
1141 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1142 list_memberships = Pleroma.List.memberships(opts[:user])
1143
1144 fetch_activities_query(recipients ++ list_memberships, opts)
1145 |> Pagination.fetch_paginated(opts, pagination)
1146 |> Enum.reverse()
1147 |> maybe_update_cc(list_memberships, opts[:user])
1148 end
1149
1150 @doc """
1151 Fetch favorites activities of user with order by sort adds to favorites
1152 """
1153 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1154 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1155 user.ap_id
1156 |> Activity.Queries.by_actor()
1157 |> Activity.Queries.by_type("Like")
1158 |> Activity.with_joined_object()
1159 |> Object.with_joined_activity()
1160 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1161 |> order_by([like, _, _], desc_nulls_last: like.id)
1162 |> Pagination.fetch_paginated(
1163 Map.merge(params, %{skip_order: true}),
1164 pagination
1165 )
1166 end
1167
1168 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1169 Enum.map(activities, fn
1170 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1171 if Enum.any?(bcc, &(&1 in list_memberships)) do
1172 update_in(activity.data["cc"], &[user_ap_id | &1])
1173 else
1174 activity
1175 end
1176
1177 activity ->
1178 activity
1179 end)
1180 end
1181
1182 defp maybe_update_cc(activities, _, _), do: activities
1183
1184 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1185 from(activity in query,
1186 where:
1187 fragment("? && ?", activity.recipients, ^recipients) or
1188 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1189 ^Constants.as_public() in activity.recipients)
1190 )
1191 end
1192
1193 def fetch_activities_bounded(
1194 recipients,
1195 recipients_with_public,
1196 opts \\ %{},
1197 pagination \\ :keyset
1198 ) do
1199 fetch_activities_query([], opts)
1200 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1201 |> Pagination.fetch_paginated(opts, pagination)
1202 |> Enum.reverse()
1203 end
1204
1205 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1206 def upload(file, opts \\ []) do
1207 with {:ok, data} <- Upload.store(file, opts) do
1208 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1209
1210 Repo.insert(%Object{data: obj_data})
1211 end
1212 end
1213
1214 @spec get_actor_url(any()) :: binary() | nil
1215 defp get_actor_url(url) when is_binary(url), do: url
1216 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1217
1218 defp get_actor_url(url) when is_list(url) do
1219 url
1220 |> List.first()
1221 |> get_actor_url()
1222 end
1223
1224 defp get_actor_url(_url), do: nil
1225
1226 defp object_to_user_data(data) do
1227 avatar =
1228 data["icon"]["url"] &&
1229 %{
1230 "type" => "Image",
1231 "url" => [%{"href" => data["icon"]["url"]}]
1232 }
1233
1234 banner =
1235 data["image"]["url"] &&
1236 %{
1237 "type" => "Image",
1238 "url" => [%{"href" => data["image"]["url"]}]
1239 }
1240
1241 fields =
1242 data
1243 |> Map.get("attachment", [])
1244 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1245 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1246
1247 emojis =
1248 data
1249 |> Map.get("tag", [])
1250 |> Enum.filter(fn
1251 %{"type" => "Emoji"} -> true
1252 _ -> false
1253 end)
1254 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1255 {String.trim(name, ":"), url}
1256 end)
1257
1258 is_locked = data["manuallyApprovesFollowers"] || false
1259 capabilities = data["capabilities"] || %{}
1260 accepts_chat_messages = capabilities["acceptsChatMessages"]
1261 data = Transmogrifier.maybe_fix_user_object(data)
1262 is_discoverable = data["discoverable"] || false
1263 invisible = data["invisible"] || false
1264 actor_type = data["type"] || "Person"
1265
1266 public_key =
1267 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1268 data["publicKey"]["publicKeyPem"]
1269 else
1270 nil
1271 end
1272
1273 shared_inbox =
1274 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1275 data["endpoints"]["sharedInbox"]
1276 else
1277 nil
1278 end
1279
1280 user_data = %{
1281 ap_id: data["id"],
1282 uri: get_actor_url(data["url"]),
1283 ap_enabled: true,
1284 banner: banner,
1285 fields: fields,
1286 emoji: emojis,
1287 is_locked: is_locked,
1288 is_discoverable: is_discoverable,
1289 invisible: invisible,
1290 avatar: avatar,
1291 name: data["name"],
1292 follower_address: data["followers"],
1293 following_address: data["following"],
1294 bio: data["summary"] || "",
1295 actor_type: actor_type,
1296 also_known_as: Map.get(data, "alsoKnownAs", []),
1297 public_key: public_key,
1298 inbox: data["inbox"],
1299 shared_inbox: shared_inbox,
1300 accepts_chat_messages: accepts_chat_messages
1301 }
1302
1303 # nickname can be nil because of virtual actors
1304 if data["preferredUsername"] do
1305 Map.put(
1306 user_data,
1307 :nickname,
1308 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1309 )
1310 else
1311 Map.put(user_data, :nickname, nil)
1312 end
1313 end
1314
1315 def fetch_follow_information_for_user(user) do
1316 with {:ok, following_data} <-
1317 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1318 {:ok, hide_follows} <- collection_private(following_data),
1319 {:ok, followers_data} <-
1320 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1321 {:ok, hide_followers} <- collection_private(followers_data) do
1322 {:ok,
1323 %{
1324 hide_follows: hide_follows,
1325 follower_count: normalize_counter(followers_data["totalItems"]),
1326 following_count: normalize_counter(following_data["totalItems"]),
1327 hide_followers: hide_followers
1328 }}
1329 else
1330 {:error, _} = e -> e
1331 e -> {:error, e}
1332 end
1333 end
1334
1335 defp normalize_counter(counter) when is_integer(counter), do: counter
1336 defp normalize_counter(_), do: 0
1337
1338 def maybe_update_follow_information(user_data) do
1339 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1340 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1341 {_, true} <-
1342 {:collections_available,
1343 !!(user_data[:following_address] && user_data[:follower_address])},
1344 {:ok, info} <-
1345 fetch_follow_information_for_user(user_data) do
1346 info = Map.merge(user_data[:info] || %{}, info)
1347
1348 user_data
1349 |> Map.put(:info, info)
1350 else
1351 {:user_type_check, false} ->
1352 user_data
1353
1354 {:collections_available, false} ->
1355 user_data
1356
1357 {:enabled, false} ->
1358 user_data
1359
1360 e ->
1361 Logger.error(
1362 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1363 )
1364
1365 user_data
1366 end
1367 end
1368
1369 defp collection_private(%{"first" => %{"type" => type}})
1370 when type in ["CollectionPage", "OrderedCollectionPage"],
1371 do: {:ok, false}
1372
1373 defp collection_private(%{"first" => first}) do
1374 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1375 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1376 {:ok, false}
1377 else
1378 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1379 {:error, _} = e -> e
1380 e -> {:error, e}
1381 end
1382 end
1383
1384 defp collection_private(_data), do: {:ok, true}
1385
1386 def user_data_from_user_object(data) do
1387 with {:ok, data} <- MRF.filter(data) do
1388 {:ok, object_to_user_data(data)}
1389 else
1390 e -> {:error, e}
1391 end
1392 end
1393
1394 def fetch_and_prepare_user_from_ap_id(ap_id) do
1395 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1396 {:ok, data} <- user_data_from_user_object(data) do
1397 {:ok, maybe_update_follow_information(data)}
1398 else
1399 # If this has been deleted, only log a debug and not an error
1400 {:error, "Object has been deleted" = e} ->
1401 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1402 {:error, e}
1403
1404 {:error, {:reject, reason} = e} ->
1405 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1406 {:error, e}
1407
1408 {:error, e} ->
1409 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1410 {:error, e}
1411 end
1412 end
1413
1414 def maybe_handle_clashing_nickname(data) do
1415 with nickname when is_binary(nickname) <- data[:nickname],
1416 %User{} = old_user <- User.get_by_nickname(nickname),
1417 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1418 Logger.info(
1419 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1420 data[:ap_id]
1421 }, renaming."
1422 )
1423
1424 old_user
1425 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1426 |> User.update_and_set_cache()
1427 else
1428 {:ap_id_comparison, true} ->
1429 Logger.info(
1430 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1431 )
1432
1433 _ ->
1434 nil
1435 end
1436 end
1437
1438 def make_user_from_ap_id(ap_id) do
1439 user = User.get_cached_by_ap_id(ap_id)
1440
1441 if user && !User.ap_enabled?(user) do
1442 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1443 else
1444 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1445 if user do
1446 user
1447 |> User.remote_user_changeset(data)
1448 |> User.update_and_set_cache()
1449 else
1450 maybe_handle_clashing_nickname(data)
1451
1452 data
1453 |> User.remote_user_changeset()
1454 |> Repo.insert()
1455 |> User.set_cache()
1456 end
1457 end
1458 end
1459 end
1460
1461 def make_user_from_nickname(nickname) do
1462 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1463 make_user_from_ap_id(ap_id)
1464 else
1465 _e -> {:error, "No AP id in WebFinger"}
1466 end
1467 end
1468
1469 # filter out broken threads
1470 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1471 entire_thread_visible_for_user?(activity, user)
1472 end
1473
1474 # do post-processing on a specific activity
1475 def contain_activity(%Activity{} = activity, %User{} = user) do
1476 contain_broken_threads(activity, user)
1477 end
1478
1479 def fetch_direct_messages_query do
1480 Activity
1481 |> restrict_type(%{type: "Create"})
1482 |> restrict_visibility(%{visibility: "direct"})
1483 |> order_by([activity], asc: activity.id)
1484 end
1485 end