Notifications: Make notifications save their type.
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.Object.Containment
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.Pagination
17 alias Pleroma.Repo
18 alias Pleroma.Upload
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.MRF
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.Utils
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
26
27 import Ecto.Query
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
30
31 require Logger
32 require Pleroma.Constants
33
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
41
42 recipients =
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
45 nil -> true
46 user -> User.following?(user, actor)
47 end
48 end)
49
50 {recipients, to, cc}
51 end
52
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
59 {recipients, to, cc}
60 end
61
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
67 {recipients, to, cc}
68 end
69
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
74 true
75 else
76 _e -> false
77 end
78 else
79 true
80 end
81 end
82
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
86 end
87
88 defp check_remote_limit(_), do: true
89
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
92 end
93
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
96 end
97
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
100 "type" => "Create"
101 }) do
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
104 end
105 end
106
107 def increase_replies_count_if_reply(_create_data), do: :noop
108
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
111 }) do
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
114 end
115 end
116
117 def decrease_replies_count_if_reply(_object), do: :noop
118
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
121 "type" => "Create",
122 "actor" => actor
123 }) do
124 Object.increase_vote_count(reply_ap_id, name, actor)
125 end
126
127 def increase_poll_votes_if_vote(_create_data), do: :noop
128
129 @object_types ["ChatMessage"]
130 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
131 def persist(%{"type" => type} = object, meta) when type in @object_types do
132 with {:ok, object} <- Object.create(object) do
133 {:ok, object, meta}
134 end
135 end
136
137 def persist(object, meta) do
138 with local <- Keyword.fetch!(meta, :local),
139 {recipients, _, _} <- get_recipients(object),
140 {:ok, activity} <-
141 Repo.insert(%Activity{
142 data: object,
143 local: local,
144 recipients: recipients,
145 actor: object["actor"]
146 }) do
147 {:ok, activity, meta}
148 end
149 end
150
151 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
152 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
153 with nil <- Activity.normalize(map),
154 map <- lazy_put_activity_defaults(map, fake),
155 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
156 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
157 {:ok, map} <- MRF.filter(map),
158 {recipients, _, _} = get_recipients(map),
159 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
160 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
161 {:ok, map, object} <- insert_full_object(map) do
162 {:ok, activity} =
163 Repo.insert(%Activity{
164 data: map,
165 local: local,
166 actor: map["actor"],
167 recipients: recipients
168 })
169
170 # Splice in the child object if we have one.
171 activity =
172 if not is_nil(object) do
173 Map.put(activity, :object, object)
174 else
175 activity
176 end
177
178 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
179
180 {:ok, activity}
181 else
182 %Activity{} = activity ->
183 {:ok, activity}
184
185 {:fake, true, map, recipients} ->
186 activity = %Activity{
187 data: map,
188 local: local,
189 actor: map["actor"],
190 recipients: recipients,
191 id: "pleroma:fakeid"
192 }
193
194 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
195 {:ok, activity}
196
197 error ->
198 {:error, error}
199 end
200 end
201
202 def notify_and_stream(activity) do
203 Notification.create_notifications(activity)
204
205 conversation = create_or_bump_conversation(activity, activity.actor)
206 participations = get_participations(conversation)
207 stream_out(activity)
208 stream_out_participations(participations)
209 end
210
211 defp create_or_bump_conversation(activity, actor) do
212 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
213 %User{} = user <- User.get_cached_by_ap_id(actor),
214 Participation.mark_as_read(user, conversation) do
215 {:ok, conversation}
216 end
217 end
218
219 defp get_participations({:ok, conversation}) do
220 conversation
221 |> Repo.preload(:participations, force: true)
222 |> Map.get(:participations)
223 end
224
225 defp get_participations(_), do: []
226
227 def stream_out_participations(participations) do
228 participations =
229 participations
230 |> Repo.preload(:user)
231
232 Streamer.stream("participation", participations)
233 end
234
235 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
236 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
237 conversation = Repo.preload(conversation, :participations),
238 last_activity_id =
239 fetch_latest_activity_id_for_context(conversation.ap_id, %{
240 "user" => user,
241 "blocking_user" => user
242 }) do
243 if last_activity_id do
244 stream_out_participations(conversation.participations)
245 end
246 end
247 end
248
249 def stream_out_participations(_, _), do: :noop
250
251 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
252 when data_type in ["Create", "Announce", "Delete"] do
253 activity
254 |> Topics.get_activity_topics()
255 |> Streamer.stream(activity)
256 end
257
258 def stream_out(_activity) do
259 :noop
260 end
261
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
265 result
266 end
267 end
268
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
275
276 with create_data <-
277 make_create_data(
278 %{to: to, actor: actor, published: published, context: context, object: object},
279 additional
280 ),
281 {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 _ <- increase_poll_votes_if_vote(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 _ <- notify_and_stream(activity),
288 :ok <- maybe_federate(activity) do
289 {:ok, activity}
290 else
291 {:quick_insert, true, activity} ->
292 {:ok, activity}
293
294 {:fake, true, activity} ->
295 {:ok, activity}
296
297 {:error, message} ->
298 Repo.rollback(message)
299 end
300 end
301
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
308
309 with listen_data <-
310 make_listen_data(
311 %{to: to, actor: actor, published: published, context: context, object: object},
312 additional
313 ),
314 {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
322 def accept(params) do
323 accept_or_reject("Accept", params)
324 end
325
326 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
327 def reject(params) do
328 accept_or_reject("Reject", params)
329 end
330
331 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
332 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
333 local = Map.get(params, :local, true)
334 activity_id = Map.get(params, :activity_id, nil)
335
336 with data <-
337 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
338 |> Utils.maybe_put("id", activity_id),
339 {:ok, activity} <- insert(data, local),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(activity) do
342 {:ok, activity}
343 end
344 end
345
346 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
347 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
348 local = !(params[:local] == false)
349 activity_id = params[:activity_id]
350
351 with data <- %{
352 "to" => to,
353 "cc" => cc,
354 "type" => "Update",
355 "actor" => actor,
356 "object" => object
357 },
358 data <- Utils.maybe_put(data, "id", activity_id),
359 {:ok, activity} <- insert(data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 end
364 end
365
366 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
367 {:ok, Activity.t()} | {:error, any()}
368 def follow(follower, followed, activity_id \\ nil, local \\ true) do
369 with {:ok, result} <-
370 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
371 result
372 end
373 end
374
375 defp do_follow(follower, followed, activity_id, local) do
376 with data <- make_follow_data(follower, followed, activity_id),
377 {:ok, activity} <- insert(data, local),
378 _ <- notify_and_stream(activity),
379 :ok <- maybe_federate(activity) do
380 {:ok, activity}
381 else
382 {:error, error} -> Repo.rollback(error)
383 end
384 end
385
386 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
387 {:ok, Activity.t()} | nil | {:error, any()}
388 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
389 with {:ok, result} <-
390 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
391 result
392 end
393 end
394
395 defp do_unfollow(follower, followed, activity_id, local) do
396 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
397 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
398 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
399 {:ok, activity} <- insert(unfollow_data, local),
400 _ <- notify_and_stream(activity),
401 :ok <- maybe_federate(activity) do
402 {:ok, activity}
403 else
404 nil -> nil
405 {:error, error} -> Repo.rollback(error)
406 end
407 end
408
409 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
410 {:ok, Activity.t()} | {:error, any()}
411 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
412 with {:ok, result} <-
413 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
414 result
415 end
416 end
417
418 defp do_block(blocker, blocked, activity_id, local) do
419 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
420
421 if unfollow_blocked do
422 follow_activity = fetch_latest_follow(blocker, blocked)
423 if follow_activity, do: unfollow(blocker, blocked, nil, local)
424 end
425
426 with block_data <- make_block_data(blocker, blocked, activity_id),
427 {:ok, activity} <- insert(block_data, local),
428 _ <- notify_and_stream(activity),
429 :ok <- maybe_federate(activity) do
430 {:ok, activity}
431 else
432 {:error, error} -> Repo.rollback(error)
433 end
434 end
435
436 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
437 def flag(
438 %{
439 actor: actor,
440 context: _context,
441 account: account,
442 statuses: statuses,
443 content: content
444 } = params
445 ) do
446 # only accept false as false value
447 local = !(params[:local] == false)
448 forward = !(params[:forward] == false)
449
450 additional = params[:additional] || %{}
451
452 additional =
453 if forward do
454 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
455 else
456 Map.merge(additional, %{"to" => [], "cc" => []})
457 end
458
459 with flag_data <- make_flag_data(params, additional),
460 {:ok, activity} <- insert(flag_data, local),
461 {:ok, stripped_activity} <- strip_report_status_data(activity),
462 _ <- notify_and_stream(activity),
463 :ok <- maybe_federate(stripped_activity) do
464 User.all_superusers()
465 |> Enum.filter(fn user -> not is_nil(user.email) end)
466 |> Enum.each(fn superuser ->
467 superuser
468 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
469 |> Pleroma.Emails.Mailer.deliver_async()
470 end)
471
472 {:ok, activity}
473 end
474 end
475
476 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
477 def move(%User{} = origin, %User{} = target, local \\ true) do
478 params = %{
479 "type" => "Move",
480 "actor" => origin.ap_id,
481 "object" => origin.ap_id,
482 "target" => target.ap_id
483 }
484
485 with true <- origin.ap_id in target.also_known_as,
486 {:ok, activity} <- insert(params, local),
487 _ <- notify_and_stream(activity) do
488 maybe_federate(activity)
489
490 BackgroundWorker.enqueue("move_following", %{
491 "origin_id" => origin.id,
492 "target_id" => target.id
493 })
494
495 {:ok, activity}
496 else
497 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
498 err -> err
499 end
500 end
501
502 def fetch_activities_for_context_query(context, opts) do
503 public = [Constants.as_public()]
504
505 recipients =
506 if opts["user"],
507 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
508 else: public
509
510 from(activity in Activity)
511 |> maybe_preload_objects(opts)
512 |> maybe_preload_bookmarks(opts)
513 |> maybe_set_thread_muted_field(opts)
514 |> restrict_blocked(opts)
515 |> restrict_recipients(recipients, opts["user"])
516 |> where(
517 [activity],
518 fragment(
519 "?->>'type' = ? and ?->>'context' = ?",
520 activity.data,
521 "Create",
522 activity.data,
523 ^context
524 )
525 )
526 |> exclude_poll_votes(opts)
527 |> exclude_id(opts)
528 |> order_by([activity], desc: activity.id)
529 end
530
531 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
532 def fetch_activities_for_context(context, opts \\ %{}) do
533 context
534 |> fetch_activities_for_context_query(opts)
535 |> Repo.all()
536 end
537
538 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
539 FlakeId.Ecto.CompatType.t() | nil
540 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
541 context
542 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
543 |> limit(1)
544 |> select([a], a.id)
545 |> Repo.one()
546 end
547
548 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
549 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
550 opts = Map.drop(opts, ["user"])
551
552 query = fetch_activities_query([Constants.as_public()], opts)
553
554 query =
555 if opts["restrict_unlisted"] do
556 restrict_unlisted(query)
557 else
558 query
559 end
560
561 Pagination.fetch_paginated(query, opts, pagination)
562 end
563
564 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
565 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
566 opts
567 |> Map.put("restrict_unlisted", true)
568 |> fetch_public_or_unlisted_activities(pagination)
569 end
570
571 @valid_visibilities ~w[direct unlisted public private]
572
573 defp restrict_visibility(query, %{visibility: visibility})
574 when is_list(visibility) do
575 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
576 query =
577 from(
578 a in query,
579 where:
580 fragment(
581 "activity_visibility(?, ?, ?) = ANY (?)",
582 a.actor,
583 a.recipients,
584 a.data,
585 ^visibility
586 )
587 )
588
589 query
590 else
591 Logger.error("Could not restrict visibility to #{visibility}")
592 end
593 end
594
595 defp restrict_visibility(query, %{visibility: visibility})
596 when visibility in @valid_visibilities do
597 from(
598 a in query,
599 where:
600 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
601 )
602 end
603
604 defp restrict_visibility(_query, %{visibility: visibility})
605 when visibility not in @valid_visibilities do
606 Logger.error("Could not restrict visibility to #{visibility}")
607 end
608
609 defp restrict_visibility(query, _visibility), do: query
610
611 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
612 when is_list(visibility) do
613 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
614 from(
615 a in query,
616 where:
617 not fragment(
618 "activity_visibility(?, ?, ?) = ANY (?)",
619 a.actor,
620 a.recipients,
621 a.data,
622 ^visibility
623 )
624 )
625 else
626 Logger.error("Could not exclude visibility to #{visibility}")
627 query
628 end
629 end
630
631 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
632 when visibility in @valid_visibilities do
633 from(
634 a in query,
635 where:
636 not fragment(
637 "activity_visibility(?, ?, ?) = ?",
638 a.actor,
639 a.recipients,
640 a.data,
641 ^visibility
642 )
643 )
644 end
645
646 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
647 when visibility not in [nil | @valid_visibilities] do
648 Logger.error("Could not exclude visibility to #{visibility}")
649 query
650 end
651
652 defp exclude_visibility(query, _visibility), do: query
653
654 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
655 do: query
656
657 defp restrict_thread_visibility(
658 query,
659 %{"user" => %User{skip_thread_containment: true}},
660 _
661 ),
662 do: query
663
664 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
665 from(
666 a in query,
667 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
668 )
669 end
670
671 defp restrict_thread_visibility(query, _, _), do: query
672
673 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
674 params =
675 params
676 |> Map.put("user", reading_user)
677 |> Map.put("actor_id", user.ap_id)
678
679 recipients =
680 user_activities_recipients(%{
681 "godmode" => params["godmode"],
682 "reading_user" => reading_user
683 })
684
685 fetch_activities(recipients, params)
686 |> Enum.reverse()
687 end
688
689 def fetch_user_activities(user, reading_user, params \\ %{}) do
690 params =
691 params
692 |> Map.put("type", ["Create", "Announce"])
693 |> Map.put("user", reading_user)
694 |> Map.put("actor_id", user.ap_id)
695 |> Map.put("pinned_activity_ids", user.pinned_activities)
696
697 params =
698 if User.blocks?(reading_user, user) do
699 params
700 else
701 params
702 |> Map.put("blocking_user", reading_user)
703 |> Map.put("muting_user", reading_user)
704 end
705
706 recipients =
707 user_activities_recipients(%{
708 "godmode" => params["godmode"],
709 "reading_user" => reading_user
710 })
711
712 fetch_activities(recipients, params)
713 |> Enum.reverse()
714 end
715
716 def fetch_statuses(reading_user, params) do
717 params =
718 params
719 |> Map.put("type", ["Create", "Announce"])
720
721 recipients =
722 user_activities_recipients(%{
723 "godmode" => params["godmode"],
724 "reading_user" => reading_user
725 })
726
727 fetch_activities(recipients, params, :offset)
728 |> Enum.reverse()
729 end
730
731 defp user_activities_recipients(%{"godmode" => true}) do
732 []
733 end
734
735 defp user_activities_recipients(%{"reading_user" => reading_user}) do
736 if reading_user do
737 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
738 else
739 [Constants.as_public()]
740 end
741 end
742
743 defp restrict_since(query, %{"since_id" => ""}), do: query
744
745 defp restrict_since(query, %{"since_id" => since_id}) do
746 from(activity in query, where: activity.id > ^since_id)
747 end
748
749 defp restrict_since(query, _), do: query
750
751 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
752 raise "Can't use the child object without preloading!"
753 end
754
755 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
756 when is_list(tag_reject) and tag_reject != [] do
757 from(
758 [_activity, object] in query,
759 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
760 )
761 end
762
763 defp restrict_tag_reject(query, _), do: query
764
765 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
766 raise "Can't use the child object without preloading!"
767 end
768
769 defp restrict_tag_all(query, %{"tag_all" => tag_all})
770 when is_list(tag_all) and tag_all != [] do
771 from(
772 [_activity, object] in query,
773 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
774 )
775 end
776
777 defp restrict_tag_all(query, _), do: query
778
779 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
780 raise "Can't use the child object without preloading!"
781 end
782
783 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
784 from(
785 [_activity, object] in query,
786 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
787 )
788 end
789
790 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
791 from(
792 [_activity, object] in query,
793 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
794 )
795 end
796
797 defp restrict_tag(query, _), do: query
798
799 defp restrict_recipients(query, [], _user), do: query
800
801 defp restrict_recipients(query, recipients, nil) do
802 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
803 end
804
805 defp restrict_recipients(query, recipients, user) do
806 from(
807 activity in query,
808 where: fragment("? && ?", ^recipients, activity.recipients),
809 or_where: activity.actor == ^user.ap_id
810 )
811 end
812
813 defp restrict_local(query, %{"local_only" => true}) do
814 from(activity in query, where: activity.local == true)
815 end
816
817 defp restrict_local(query, _), do: query
818
819 defp restrict_actor(query, %{"actor_id" => actor_id}) do
820 from(activity in query, where: activity.actor == ^actor_id)
821 end
822
823 defp restrict_actor(query, _), do: query
824
825 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
826 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
827 end
828
829 defp restrict_type(query, %{"type" => type}) do
830 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
831 end
832
833 defp restrict_type(query, _), do: query
834
835 defp restrict_state(query, %{"state" => state}) do
836 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
837 end
838
839 defp restrict_state(query, _), do: query
840
841 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
842 from(
843 [_activity, object] in query,
844 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
845 )
846 end
847
848 defp restrict_favorited_by(query, _), do: query
849
850 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
851 raise "Can't use the child object without preloading!"
852 end
853
854 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
855 from(
856 [_activity, object] in query,
857 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
858 )
859 end
860
861 defp restrict_media(query, _), do: query
862
863 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
864 from(
865 [_activity, object] in query,
866 where: fragment("?->>'inReplyTo' is null", object.data)
867 )
868 end
869
870 defp restrict_replies(query, %{
871 "reply_filtering_user" => user,
872 "reply_visibility" => "self"
873 }) do
874 from(
875 [activity, object] in query,
876 where:
877 fragment(
878 "?->>'inReplyTo' is null OR ? = ANY(?)",
879 object.data,
880 ^user.ap_id,
881 activity.recipients
882 )
883 )
884 end
885
886 defp restrict_replies(query, %{
887 "reply_filtering_user" => user,
888 "reply_visibility" => "following"
889 }) do
890 from(
891 [activity, object] in query,
892 where:
893 fragment(
894 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
895 object.data,
896 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
897 activity.recipients,
898 activity.actor,
899 activity.actor,
900 ^user.ap_id
901 )
902 )
903 end
904
905 defp restrict_replies(query, _), do: query
906
907 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
908 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
909 end
910
911 defp restrict_reblogs(query, _), do: query
912
913 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
914
915 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
916 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
917
918 query =
919 from([activity] in query,
920 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
921 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
922 )
923
924 unless opts["skip_preload"] do
925 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
926 else
927 query
928 end
929 end
930
931 defp restrict_muted(query, _), do: query
932
933 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
934 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
935 domain_blocks = user.domain_blocks || []
936
937 following_ap_ids = User.get_friends_ap_ids(user)
938
939 query =
940 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
941
942 from(
943 [activity, object: o] in query,
944 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
945 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
946 where:
947 fragment(
948 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
949 activity.data,
950 activity.data,
951 ^blocked_ap_ids
952 ),
953 where:
954 fragment(
955 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
956 activity.actor,
957 ^domain_blocks,
958 activity.actor,
959 ^following_ap_ids
960 ),
961 where:
962 fragment(
963 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
964 o.data,
965 ^domain_blocks,
966 o.data,
967 ^following_ap_ids
968 )
969 )
970 end
971
972 defp restrict_blocked(query, _), do: query
973
974 defp restrict_unlisted(query) do
975 from(
976 activity in query,
977 where:
978 fragment(
979 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
980 activity.data,
981 ^[Constants.as_public()]
982 )
983 )
984 end
985
986 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
987 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
988 # and `restrict_muted/2`
989
990 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
991 when pinned in [true, "true", "1"] do
992 from(activity in query, where: activity.id in ^ids)
993 end
994
995 defp restrict_pinned(query, _), do: query
996
997 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
998 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
999
1000 from(
1001 activity in query,
1002 where:
1003 fragment(
1004 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1005 activity.data,
1006 activity.actor,
1007 ^muted_reblogs
1008 )
1009 )
1010 end
1011
1012 defp restrict_muted_reblogs(query, _), do: query
1013
1014 defp restrict_instance(query, %{"instance" => instance}) do
1015 users =
1016 from(
1017 u in User,
1018 select: u.ap_id,
1019 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1020 )
1021 |> Repo.all()
1022
1023 from(activity in query, where: activity.actor in ^users)
1024 end
1025
1026 defp restrict_instance(query, _), do: query
1027
1028 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1029
1030 defp exclude_poll_votes(query, _) do
1031 if has_named_binding?(query, :object) do
1032 from([activity, object: o] in query,
1033 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1034 )
1035 else
1036 query
1037 end
1038 end
1039
1040 defp exclude_chat_messages(query, %{"include_chat_messages" => true}), do: query
1041
1042 defp exclude_chat_messages(query, _) do
1043 if has_named_binding?(query, :object) do
1044 from([activity, object: o] in query,
1045 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1046 )
1047 else
1048 query
1049 end
1050 end
1051
1052 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1053 from(activity in query, where: activity.id != ^id)
1054 end
1055
1056 defp exclude_id(query, _), do: query
1057
1058 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1059
1060 defp maybe_preload_objects(query, _) do
1061 query
1062 |> Activity.with_preloaded_object()
1063 end
1064
1065 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1066
1067 defp maybe_preload_bookmarks(query, opts) do
1068 query
1069 |> Activity.with_preloaded_bookmark(opts["user"])
1070 end
1071
1072 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1073 query
1074 |> Activity.with_preloaded_report_notes()
1075 end
1076
1077 defp maybe_preload_report_notes(query, _), do: query
1078
1079 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1080
1081 defp maybe_set_thread_muted_field(query, opts) do
1082 query
1083 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1084 end
1085
1086 defp maybe_order(query, %{order: :desc}) do
1087 query
1088 |> order_by(desc: :id)
1089 end
1090
1091 defp maybe_order(query, %{order: :asc}) do
1092 query
1093 |> order_by(asc: :id)
1094 end
1095
1096 defp maybe_order(query, _), do: query
1097
1098 defp fetch_activities_query_ap_ids_ops(opts) do
1099 source_user = opts["muting_user"]
1100 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1101
1102 ap_id_relationships =
1103 ap_id_relationships ++
1104 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1105 [:block]
1106 else
1107 []
1108 end
1109
1110 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1111
1112 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1113 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1114
1115 restrict_muted_reblogs_opts =
1116 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1117
1118 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1119 end
1120
1121 def fetch_activities_query(recipients, opts \\ %{}) do
1122 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1123 fetch_activities_query_ap_ids_ops(opts)
1124
1125 config = %{
1126 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1127 }
1128
1129 Activity
1130 |> maybe_preload_objects(opts)
1131 |> maybe_preload_bookmarks(opts)
1132 |> maybe_preload_report_notes(opts)
1133 |> maybe_set_thread_muted_field(opts)
1134 |> maybe_order(opts)
1135 |> restrict_recipients(recipients, opts["user"])
1136 |> restrict_replies(opts)
1137 |> restrict_tag(opts)
1138 |> restrict_tag_reject(opts)
1139 |> restrict_tag_all(opts)
1140 |> restrict_since(opts)
1141 |> restrict_local(opts)
1142 |> restrict_actor(opts)
1143 |> restrict_type(opts)
1144 |> restrict_state(opts)
1145 |> restrict_favorited_by(opts)
1146 |> restrict_blocked(restrict_blocked_opts)
1147 |> restrict_muted(restrict_muted_opts)
1148 |> restrict_media(opts)
1149 |> restrict_visibility(opts)
1150 |> restrict_thread_visibility(opts, config)
1151 |> restrict_reblogs(opts)
1152 |> restrict_pinned(opts)
1153 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1154 |> restrict_instance(opts)
1155 |> Activity.restrict_deactivated_users()
1156 |> exclude_poll_votes(opts)
1157 |> exclude_chat_messages(opts)
1158 |> exclude_visibility(opts)
1159 end
1160
1161 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1162 list_memberships = Pleroma.List.memberships(opts["user"])
1163
1164 fetch_activities_query(recipients ++ list_memberships, opts)
1165 |> Pagination.fetch_paginated(opts, pagination)
1166 |> Enum.reverse()
1167 |> maybe_update_cc(list_memberships, opts["user"])
1168 end
1169
1170 @doc """
1171 Fetch favorites activities of user with order by sort adds to favorites
1172 """
1173 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1174 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1175 user.ap_id
1176 |> Activity.Queries.by_actor()
1177 |> Activity.Queries.by_type("Like")
1178 |> Activity.with_joined_object()
1179 |> Object.with_joined_activity()
1180 |> select([_like, object, activity], %{activity | object: object})
1181 |> order_by([like, _, _], desc_nulls_last: like.id)
1182 |> Pagination.fetch_paginated(
1183 Map.merge(params, %{"skip_order" => true}),
1184 pagination,
1185 :object_activity
1186 )
1187 end
1188
1189 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1190 when is_list(list_memberships) and length(list_memberships) > 0 do
1191 Enum.map(activities, fn
1192 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1193 if Enum.any?(bcc, &(&1 in list_memberships)) do
1194 update_in(activity.data["cc"], &[user_ap_id | &1])
1195 else
1196 activity
1197 end
1198
1199 activity ->
1200 activity
1201 end)
1202 end
1203
1204 defp maybe_update_cc(activities, _, _), do: activities
1205
1206 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1207 from(activity in query,
1208 where:
1209 fragment("? && ?", activity.recipients, ^recipients) or
1210 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1211 ^Constants.as_public() in activity.recipients)
1212 )
1213 end
1214
1215 def fetch_activities_bounded(
1216 recipients,
1217 recipients_with_public,
1218 opts \\ %{},
1219 pagination \\ :keyset
1220 ) do
1221 fetch_activities_query([], opts)
1222 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1223 |> Pagination.fetch_paginated(opts, pagination)
1224 |> Enum.reverse()
1225 end
1226
1227 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1228 def upload(file, opts \\ []) do
1229 with {:ok, data} <- Upload.store(file, opts) do
1230 obj_data =
1231 if opts[:actor] do
1232 Map.put(data, "actor", opts[:actor])
1233 else
1234 data
1235 end
1236
1237 Repo.insert(%Object{data: obj_data})
1238 end
1239 end
1240
1241 @spec get_actor_url(any()) :: binary() | nil
1242 defp get_actor_url(url) when is_binary(url), do: url
1243 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1244
1245 defp get_actor_url(url) when is_list(url) do
1246 url
1247 |> List.first()
1248 |> get_actor_url()
1249 end
1250
1251 defp get_actor_url(_url), do: nil
1252
1253 defp object_to_user_data(data) do
1254 avatar =
1255 data["icon"]["url"] &&
1256 %{
1257 "type" => "Image",
1258 "url" => [%{"href" => data["icon"]["url"]}]
1259 }
1260
1261 banner =
1262 data["image"]["url"] &&
1263 %{
1264 "type" => "Image",
1265 "url" => [%{"href" => data["image"]["url"]}]
1266 }
1267
1268 fields =
1269 data
1270 |> Map.get("attachment", [])
1271 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1272 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1273
1274 emojis =
1275 data
1276 |> Map.get("tag", [])
1277 |> Enum.filter(fn
1278 %{"type" => "Emoji"} -> true
1279 _ -> false
1280 end)
1281 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1282 Map.put(acc, String.trim(name, ":"), url)
1283 end)
1284
1285 locked = data["manuallyApprovesFollowers"] || false
1286 data = Transmogrifier.maybe_fix_user_object(data)
1287 discoverable = data["discoverable"] || false
1288 invisible = data["invisible"] || false
1289 actor_type = data["type"] || "Person"
1290
1291 public_key =
1292 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1293 data["publicKey"]["publicKeyPem"]
1294 else
1295 nil
1296 end
1297
1298 shared_inbox =
1299 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1300 data["endpoints"]["sharedInbox"]
1301 else
1302 nil
1303 end
1304
1305 user_data = %{
1306 ap_id: data["id"],
1307 uri: get_actor_url(data["url"]),
1308 ap_enabled: true,
1309 banner: banner,
1310 fields: fields,
1311 emoji: emojis,
1312 locked: locked,
1313 discoverable: discoverable,
1314 invisible: invisible,
1315 avatar: avatar,
1316 name: data["name"],
1317 follower_address: data["followers"],
1318 following_address: data["following"],
1319 bio: data["summary"],
1320 actor_type: actor_type,
1321 also_known_as: Map.get(data, "alsoKnownAs", []),
1322 public_key: public_key,
1323 inbox: data["inbox"],
1324 shared_inbox: shared_inbox
1325 }
1326
1327 # nickname can be nil because of virtual actors
1328 user_data =
1329 if data["preferredUsername"] do
1330 Map.put(
1331 user_data,
1332 :nickname,
1333 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1334 )
1335 else
1336 Map.put(user_data, :nickname, nil)
1337 end
1338
1339 {:ok, user_data}
1340 end
1341
1342 def fetch_follow_information_for_user(user) do
1343 with {:ok, following_data} <-
1344 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1345 {:ok, hide_follows} <- collection_private(following_data),
1346 {:ok, followers_data} <-
1347 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1348 {:ok, hide_followers} <- collection_private(followers_data) do
1349 {:ok,
1350 %{
1351 hide_follows: hide_follows,
1352 follower_count: normalize_counter(followers_data["totalItems"]),
1353 following_count: normalize_counter(following_data["totalItems"]),
1354 hide_followers: hide_followers
1355 }}
1356 else
1357 {:error, _} = e -> e
1358 e -> {:error, e}
1359 end
1360 end
1361
1362 defp normalize_counter(counter) when is_integer(counter), do: counter
1363 defp normalize_counter(_), do: 0
1364
1365 def maybe_update_follow_information(user_data) do
1366 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1367 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1368 {_, true} <-
1369 {:collections_available,
1370 !!(user_data[:following_address] && user_data[:follower_address])},
1371 {:ok, info} <-
1372 fetch_follow_information_for_user(user_data) do
1373 info = Map.merge(user_data[:info] || %{}, info)
1374
1375 user_data
1376 |> Map.put(:info, info)
1377 else
1378 {:user_type_check, false} ->
1379 user_data
1380
1381 {:collections_available, false} ->
1382 user_data
1383
1384 {:enabled, false} ->
1385 user_data
1386
1387 e ->
1388 Logger.error(
1389 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1390 )
1391
1392 user_data
1393 end
1394 end
1395
1396 defp collection_private(%{"first" => %{"type" => type}})
1397 when type in ["CollectionPage", "OrderedCollectionPage"],
1398 do: {:ok, false}
1399
1400 defp collection_private(%{"first" => first}) do
1401 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1402 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1403 {:ok, false}
1404 else
1405 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1406 {:error, _} = e -> e
1407 e -> {:error, e}
1408 end
1409 end
1410
1411 defp collection_private(_data), do: {:ok, true}
1412
1413 def user_data_from_user_object(data) do
1414 with {:ok, data} <- MRF.filter(data),
1415 {:ok, data} <- object_to_user_data(data) do
1416 {:ok, data}
1417 else
1418 e -> {:error, e}
1419 end
1420 end
1421
1422 def fetch_and_prepare_user_from_ap_id(ap_id) do
1423 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1424 {:ok, data} <- user_data_from_user_object(data),
1425 data <- maybe_update_follow_information(data) do
1426 {:ok, data}
1427 else
1428 {:error, "Object has been deleted"} = e ->
1429 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1430 {:error, e}
1431
1432 e ->
1433 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1434 {:error, e}
1435 end
1436 end
1437
1438 def make_user_from_ap_id(ap_id) do
1439 user = User.get_cached_by_ap_id(ap_id)
1440
1441 if user && !User.ap_enabled?(user) do
1442 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1443 else
1444 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1445 if user do
1446 user
1447 |> User.remote_user_changeset(data)
1448 |> User.update_and_set_cache()
1449 else
1450 data
1451 |> User.remote_user_changeset()
1452 |> Repo.insert()
1453 |> User.set_cache()
1454 end
1455 else
1456 e -> {:error, e}
1457 end
1458 end
1459 end
1460
1461 def make_user_from_nickname(nickname) do
1462 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1463 make_user_from_ap_id(ap_id)
1464 else
1465 _e -> {:error, "No AP id in WebFinger"}
1466 end
1467 end
1468
1469 # filter out broken threads
1470 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1471 entire_thread_visible_for_user?(activity, user)
1472 end
1473
1474 # do post-processing on a specific activity
1475 def contain_activity(%Activity{} = activity, %User{} = user) do
1476 contain_broken_threads(activity, user)
1477 end
1478
1479 def fetch_direct_messages_query do
1480 Activity
1481 |> restrict_type(%{"type" => "Create"})
1482 |> restrict_visibility(%{visibility: "direct"})
1483 |> order_by([activity], asc: activity.id)
1484 end
1485 end