Merge branch 'develop' into 'remove-twitter-api'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.Object.Containment
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.Pagination
17 alias Pleroma.Repo
18 alias Pleroma.Upload
19 alias Pleroma.User
20 alias Pleroma.Web.ActivityPub.MRF
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.Utils
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Web.WebFinger
25 alias Pleroma.Workers.BackgroundWorker
26
27 import Ecto.Query
28 import Pleroma.Web.ActivityPub.Utils
29 import Pleroma.Web.ActivityPub.Visibility
30
31 require Logger
32 require Pleroma.Constants
33
34 # For Announce activities, we filter the recipients based on following status for any actors
35 # that match actual users. See issue #164 for more information about why this is necessary.
36 defp get_recipients(%{"type" => "Announce"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = User.get_cached_by_ap_id(data["actor"])
41
42 recipients =
43 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
44 case User.get_cached_by_ap_id(recipient) do
45 nil -> true
46 user -> User.following?(user, actor)
47 end
48 end)
49
50 {recipients, to, cc}
51 end
52
53 defp get_recipients(%{"type" => "Create"} = data) do
54 to = Map.get(data, "to", [])
55 cc = Map.get(data, "cc", [])
56 bcc = Map.get(data, "bcc", [])
57 actor = Map.get(data, "actor", [])
58 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
59 {recipients, to, cc}
60 end
61
62 defp get_recipients(data) do
63 to = Map.get(data, "to", [])
64 cc = Map.get(data, "cc", [])
65 bcc = Map.get(data, "bcc", [])
66 recipients = Enum.concat([to, cc, bcc])
67 {recipients, to, cc}
68 end
69
70 defp check_actor_is_active(actor) do
71 if not is_nil(actor) do
72 with user <- User.get_cached_by_ap_id(actor),
73 false <- user.deactivated do
74 true
75 else
76 _e -> false
77 end
78 else
79 true
80 end
81 end
82
83 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
84 limit = Config.get([:instance, :remote_limit])
85 String.length(content) <= limit
86 end
87
88 defp check_remote_limit(_), do: true
89
90 def increase_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
92 end
93
94 def decrease_note_count_if_public(actor, object) do
95 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
96 end
97
98 def increase_replies_count_if_reply(%{
99 "object" => %{"inReplyTo" => reply_ap_id} = object,
100 "type" => "Create"
101 }) do
102 if is_public?(object) do
103 Object.increase_replies_count(reply_ap_id)
104 end
105 end
106
107 def increase_replies_count_if_reply(_create_data), do: :noop
108
109 def decrease_replies_count_if_reply(%Object{
110 data: %{"inReplyTo" => reply_ap_id} = object
111 }) do
112 if is_public?(object) do
113 Object.decrease_replies_count(reply_ap_id)
114 end
115 end
116
117 def decrease_replies_count_if_reply(_object), do: :noop
118
119 def increase_poll_votes_if_vote(%{
120 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
121 "type" => "Create",
122 "actor" => actor
123 }) do
124 Object.increase_vote_count(reply_ap_id, name, actor)
125 end
126
127 def increase_poll_votes_if_vote(_create_data), do: :noop
128
129 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
130 def persist(object, meta) do
131 with local <- Keyword.fetch!(meta, :local),
132 {recipients, _, _} <- get_recipients(object),
133 {:ok, activity} <-
134 Repo.insert(%Activity{
135 data: object,
136 local: local,
137 recipients: recipients,
138 actor: object["actor"]
139 }) do
140 {:ok, activity, meta}
141 end
142 end
143
144 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
145 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
146 with nil <- Activity.normalize(map),
147 map <- lazy_put_activity_defaults(map, fake),
148 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
149 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
150 {:ok, map} <- MRF.filter(map),
151 {recipients, _, _} = get_recipients(map),
152 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
153 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
154 {:ok, map, object} <- insert_full_object(map) do
155 {:ok, activity} =
156 Repo.insert(%Activity{
157 data: map,
158 local: local,
159 actor: map["actor"],
160 recipients: recipients
161 })
162
163 # Splice in the child object if we have one.
164 activity =
165 if not is_nil(object) do
166 Map.put(activity, :object, object)
167 else
168 activity
169 end
170
171 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
172
173 {:ok, activity}
174 else
175 %Activity{} = activity ->
176 {:ok, activity}
177
178 {:fake, true, map, recipients} ->
179 activity = %Activity{
180 data: map,
181 local: local,
182 actor: map["actor"],
183 recipients: recipients,
184 id: "pleroma:fakeid"
185 }
186
187 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
188 {:ok, activity}
189
190 error ->
191 {:error, error}
192 end
193 end
194
195 def notify_and_stream(activity) do
196 Notification.create_notifications(activity)
197
198 conversation = create_or_bump_conversation(activity, activity.actor)
199 participations = get_participations(conversation)
200 stream_out(activity)
201 stream_out_participations(participations)
202 end
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor),
207 Participation.mark_as_read(user, conversation) do
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
229 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
230 conversation = Repo.preload(conversation, :participations),
231 last_activity_id =
232 fetch_latest_activity_id_for_context(conversation.ap_id, %{
233 "user" => user,
234 "blocking_user" => user
235 }) do
236 if last_activity_id do
237 stream_out_participations(conversation.participations)
238 end
239 end
240 end
241
242 def stream_out_participations(_, _), do: :noop
243
244 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
245 when data_type in ["Create", "Announce", "Delete"] do
246 activity
247 |> Topics.get_activity_topics()
248 |> Streamer.stream(activity)
249 end
250
251 def stream_out(_activity) do
252 :noop
253 end
254
255 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
256 def create(params, fake \\ false) do
257 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
258 result
259 end
260 end
261
262 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
263 additional = params[:additional] || %{}
264 # only accept false as false value
265 local = !(params[:local] == false)
266 published = params[:published]
267 quick_insert? = Config.get([:env]) == :benchmark
268
269 with create_data <-
270 make_create_data(
271 %{to: to, actor: actor, published: published, context: context, object: object},
272 additional
273 ),
274 {:ok, activity} <- insert(create_data, local, fake),
275 {:fake, false, activity} <- {:fake, fake, activity},
276 _ <- increase_replies_count_if_reply(create_data),
277 _ <- increase_poll_votes_if_vote(create_data),
278 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
279 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
280 _ <- notify_and_stream(activity),
281 :ok <- maybe_federate(activity) do
282 {:ok, activity}
283 else
284 {:quick_insert, true, activity} ->
285 {:ok, activity}
286
287 {:fake, true, activity} ->
288 {:ok, activity}
289
290 {:error, message} ->
291 Repo.rollback(message)
292 end
293 end
294
295 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
296 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
297 additional = params[:additional] || %{}
298 # only accept false as false value
299 local = !(params[:local] == false)
300 published = params[:published]
301
302 with listen_data <-
303 make_listen_data(
304 %{to: to, actor: actor, published: published, context: context, object: object},
305 additional
306 ),
307 {:ok, activity} <- insert(listen_data, local),
308 _ <- notify_and_stream(activity),
309 :ok <- maybe_federate(activity) do
310 {:ok, activity}
311 end
312 end
313
314 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def accept(params) do
316 accept_or_reject("Accept", params)
317 end
318
319 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
320 def reject(params) do
321 accept_or_reject("Reject", params)
322 end
323
324 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
325 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
326 local = Map.get(params, :local, true)
327 activity_id = Map.get(params, :activity_id, nil)
328
329 with data <-
330 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
331 |> Utils.maybe_put("id", activity_id),
332 {:ok, activity} <- insert(data, local),
333 _ <- notify_and_stream(activity),
334 :ok <- maybe_federate(activity) do
335 {:ok, activity}
336 end
337 end
338
339 @spec update(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
341 local = !(params[:local] == false)
342 activity_id = params[:activity_id]
343
344 with data <- %{
345 "to" => to,
346 "cc" => cc,
347 "type" => "Update",
348 "actor" => actor,
349 "object" => object
350 },
351 data <- Utils.maybe_put(data, "id", activity_id),
352 {:ok, activity} <- insert(data, local),
353 _ <- notify_and_stream(activity),
354 :ok <- maybe_federate(activity) do
355 {:ok, activity}
356 end
357 end
358
359 @spec announce(User.t(), Object.t(), String.t() | nil, boolean(), boolean()) ::
360 {:ok, Activity.t(), Object.t()} | {:error, any()}
361 def announce(
362 %User{ap_id: _} = user,
363 %Object{data: %{"id" => _}} = object,
364 activity_id \\ nil,
365 local \\ true,
366 public \\ true
367 ) do
368 with {:ok, result} <-
369 Repo.transaction(fn -> do_announce(user, object, activity_id, local, public) end) do
370 result
371 end
372 end
373
374 defp do_announce(user, object, activity_id, local, public) do
375 with true <- is_announceable?(object, user, public),
376 object <- Object.get_by_id(object.id),
377 announce_data <- make_announce_data(user, object, activity_id, public),
378 {:ok, activity} <- insert(announce_data, local),
379 {:ok, object} <- add_announce_to_object(activity, object),
380 _ <- notify_and_stream(activity),
381 :ok <- maybe_federate(activity) do
382 {:ok, activity, object}
383 else
384 false -> {:error, false}
385 {:error, error} -> Repo.rollback(error)
386 end
387 end
388
389 @spec follow(User.t(), User.t(), String.t() | nil, boolean()) ::
390 {:ok, Activity.t()} | {:error, any()}
391 def follow(follower, followed, activity_id \\ nil, local \\ true) do
392 with {:ok, result} <-
393 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local) end) do
394 result
395 end
396 end
397
398 defp do_follow(follower, followed, activity_id, local) do
399 with data <- make_follow_data(follower, followed, activity_id),
400 {:ok, activity} <- insert(data, local),
401 _ <- notify_and_stream(activity),
402 :ok <- maybe_federate(activity) do
403 {:ok, activity}
404 else
405 {:error, error} -> Repo.rollback(error)
406 end
407 end
408
409 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
410 {:ok, Activity.t()} | nil | {:error, any()}
411 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
412 with {:ok, result} <-
413 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
414 result
415 end
416 end
417
418 defp do_unfollow(follower, followed, activity_id, local) do
419 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
420 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
421 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
422 {:ok, activity} <- insert(unfollow_data, local),
423 _ <- notify_and_stream(activity),
424 :ok <- maybe_federate(activity) do
425 {:ok, activity}
426 else
427 nil -> nil
428 {:error, error} -> Repo.rollback(error)
429 end
430 end
431
432 @spec block(User.t(), User.t(), String.t() | nil, boolean()) ::
433 {:ok, Activity.t()} | {:error, any()}
434 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
435 with {:ok, result} <-
436 Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do
437 result
438 end
439 end
440
441 defp do_block(blocker, blocked, activity_id, local) do
442 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
443
444 if unfollow_blocked do
445 follow_activity = fetch_latest_follow(blocker, blocked)
446 if follow_activity, do: unfollow(blocker, blocked, nil, local)
447 end
448
449 with block_data <- make_block_data(blocker, blocked, activity_id),
450 {:ok, activity} <- insert(block_data, local),
451 _ <- notify_and_stream(activity),
452 :ok <- maybe_federate(activity) do
453 {:ok, activity}
454 else
455 {:error, error} -> Repo.rollback(error)
456 end
457 end
458
459 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
460 def flag(
461 %{
462 actor: actor,
463 context: _context,
464 account: account,
465 statuses: statuses,
466 content: content
467 } = params
468 ) do
469 # only accept false as false value
470 local = !(params[:local] == false)
471 forward = !(params[:forward] == false)
472
473 additional = params[:additional] || %{}
474
475 additional =
476 if forward do
477 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
478 else
479 Map.merge(additional, %{"to" => [], "cc" => []})
480 end
481
482 with flag_data <- make_flag_data(params, additional),
483 {:ok, activity} <- insert(flag_data, local),
484 {:ok, stripped_activity} <- strip_report_status_data(activity),
485 _ <- notify_and_stream(activity),
486 :ok <- maybe_federate(stripped_activity) do
487 User.all_superusers()
488 |> Enum.filter(fn user -> not is_nil(user.email) end)
489 |> Enum.each(fn superuser ->
490 superuser
491 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
492 |> Pleroma.Emails.Mailer.deliver_async()
493 end)
494
495 {:ok, activity}
496 end
497 end
498
499 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
500 def move(%User{} = origin, %User{} = target, local \\ true) do
501 params = %{
502 "type" => "Move",
503 "actor" => origin.ap_id,
504 "object" => origin.ap_id,
505 "target" => target.ap_id
506 }
507
508 with true <- origin.ap_id in target.also_known_as,
509 {:ok, activity} <- insert(params, local),
510 _ <- notify_and_stream(activity) do
511 maybe_federate(activity)
512
513 BackgroundWorker.enqueue("move_following", %{
514 "origin_id" => origin.id,
515 "target_id" => target.id
516 })
517
518 {:ok, activity}
519 else
520 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
521 err -> err
522 end
523 end
524
525 def fetch_activities_for_context_query(context, opts) do
526 public = [Constants.as_public()]
527
528 recipients =
529 if opts["user"],
530 do: [opts["user"].ap_id | User.following(opts["user"])] ++ public,
531 else: public
532
533 from(activity in Activity)
534 |> maybe_preload_objects(opts)
535 |> maybe_preload_bookmarks(opts)
536 |> maybe_set_thread_muted_field(opts)
537 |> restrict_blocked(opts)
538 |> restrict_recipients(recipients, opts["user"])
539 |> where(
540 [activity],
541 fragment(
542 "?->>'type' = ? and ?->>'context' = ?",
543 activity.data,
544 "Create",
545 activity.data,
546 ^context
547 )
548 )
549 |> exclude_poll_votes(opts)
550 |> exclude_id(opts)
551 |> order_by([activity], desc: activity.id)
552 end
553
554 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
555 def fetch_activities_for_context(context, opts \\ %{}) do
556 context
557 |> fetch_activities_for_context_query(opts)
558 |> Repo.all()
559 end
560
561 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
562 FlakeId.Ecto.CompatType.t() | nil
563 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
564 context
565 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
566 |> limit(1)
567 |> select([a], a.id)
568 |> Repo.one()
569 end
570
571 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
572 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
573 opts = Map.drop(opts, ["user"])
574
575 [Constants.as_public()]
576 |> fetch_activities_query(opts)
577 |> restrict_unlisted()
578 |> Pagination.fetch_paginated(opts, pagination)
579 end
580
581 @valid_visibilities ~w[direct unlisted public private]
582
583 defp restrict_visibility(query, %{visibility: visibility})
584 when is_list(visibility) do
585 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
586 query =
587 from(
588 a in query,
589 where:
590 fragment(
591 "activity_visibility(?, ?, ?) = ANY (?)",
592 a.actor,
593 a.recipients,
594 a.data,
595 ^visibility
596 )
597 )
598
599 query
600 else
601 Logger.error("Could not restrict visibility to #{visibility}")
602 end
603 end
604
605 defp restrict_visibility(query, %{visibility: visibility})
606 when visibility in @valid_visibilities do
607 from(
608 a in query,
609 where:
610 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
611 )
612 end
613
614 defp restrict_visibility(_query, %{visibility: visibility})
615 when visibility not in @valid_visibilities do
616 Logger.error("Could not restrict visibility to #{visibility}")
617 end
618
619 defp restrict_visibility(query, _visibility), do: query
620
621 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
622 when is_list(visibility) do
623 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
624 from(
625 a in query,
626 where:
627 not fragment(
628 "activity_visibility(?, ?, ?) = ANY (?)",
629 a.actor,
630 a.recipients,
631 a.data,
632 ^visibility
633 )
634 )
635 else
636 Logger.error("Could not exclude visibility to #{visibility}")
637 query
638 end
639 end
640
641 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
642 when visibility in @valid_visibilities do
643 from(
644 a in query,
645 where:
646 not fragment(
647 "activity_visibility(?, ?, ?) = ?",
648 a.actor,
649 a.recipients,
650 a.data,
651 ^visibility
652 )
653 )
654 end
655
656 defp exclude_visibility(query, %{"exclude_visibilities" => visibility})
657 when visibility not in [nil | @valid_visibilities] do
658 Logger.error("Could not exclude visibility to #{visibility}")
659 query
660 end
661
662 defp exclude_visibility(query, _visibility), do: query
663
664 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
665 do: query
666
667 defp restrict_thread_visibility(
668 query,
669 %{"user" => %User{skip_thread_containment: true}},
670 _
671 ),
672 do: query
673
674 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
675 from(
676 a in query,
677 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
678 )
679 end
680
681 defp restrict_thread_visibility(query, _, _), do: query
682
683 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
684 params =
685 params
686 |> Map.put("user", reading_user)
687 |> Map.put("actor_id", user.ap_id)
688
689 recipients =
690 user_activities_recipients(%{
691 "godmode" => params["godmode"],
692 "reading_user" => reading_user
693 })
694
695 fetch_activities(recipients, params)
696 |> Enum.reverse()
697 end
698
699 def fetch_user_activities(user, reading_user, params \\ %{}) do
700 params =
701 params
702 |> Map.put("type", ["Create", "Announce"])
703 |> Map.put("user", reading_user)
704 |> Map.put("actor_id", user.ap_id)
705 |> Map.put("pinned_activity_ids", user.pinned_activities)
706
707 params =
708 if User.blocks?(reading_user, user) do
709 params
710 else
711 params
712 |> Map.put("blocking_user", reading_user)
713 |> Map.put("muting_user", reading_user)
714 end
715
716 recipients =
717 user_activities_recipients(%{
718 "godmode" => params["godmode"],
719 "reading_user" => reading_user
720 })
721
722 fetch_activities(recipients, params)
723 |> Enum.reverse()
724 end
725
726 def fetch_statuses(reading_user, params) do
727 params =
728 params
729 |> Map.put("type", ["Create", "Announce"])
730
731 recipients =
732 user_activities_recipients(%{
733 "godmode" => params["godmode"],
734 "reading_user" => reading_user
735 })
736
737 fetch_activities(recipients, params, :offset)
738 |> Enum.reverse()
739 end
740
741 defp user_activities_recipients(%{"godmode" => true}) do
742 []
743 end
744
745 defp user_activities_recipients(%{"reading_user" => reading_user}) do
746 if reading_user do
747 [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)]
748 else
749 [Constants.as_public()]
750 end
751 end
752
753 defp restrict_since(query, %{"since_id" => ""}), do: query
754
755 defp restrict_since(query, %{"since_id" => since_id}) do
756 from(activity in query, where: activity.id > ^since_id)
757 end
758
759 defp restrict_since(query, _), do: query
760
761 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
762 raise "Can't use the child object without preloading!"
763 end
764
765 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
766 when is_list(tag_reject) and tag_reject != [] do
767 from(
768 [_activity, object] in query,
769 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
770 )
771 end
772
773 defp restrict_tag_reject(query, _), do: query
774
775 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
776 raise "Can't use the child object without preloading!"
777 end
778
779 defp restrict_tag_all(query, %{"tag_all" => tag_all})
780 when is_list(tag_all) and tag_all != [] do
781 from(
782 [_activity, object] in query,
783 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
784 )
785 end
786
787 defp restrict_tag_all(query, _), do: query
788
789 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
790 raise "Can't use the child object without preloading!"
791 end
792
793 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
794 from(
795 [_activity, object] in query,
796 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
797 )
798 end
799
800 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
801 from(
802 [_activity, object] in query,
803 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
804 )
805 end
806
807 defp restrict_tag(query, _), do: query
808
809 defp restrict_recipients(query, [], _user), do: query
810
811 defp restrict_recipients(query, recipients, nil) do
812 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
813 end
814
815 defp restrict_recipients(query, recipients, user) do
816 from(
817 activity in query,
818 where: fragment("? && ?", ^recipients, activity.recipients),
819 or_where: activity.actor == ^user.ap_id
820 )
821 end
822
823 defp restrict_local(query, %{"local_only" => true}) do
824 from(activity in query, where: activity.local == true)
825 end
826
827 defp restrict_local(query, _), do: query
828
829 defp restrict_actor(query, %{"actor_id" => actor_id}) do
830 from(activity in query, where: activity.actor == ^actor_id)
831 end
832
833 defp restrict_actor(query, _), do: query
834
835 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
836 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
837 end
838
839 defp restrict_type(query, %{"type" => type}) do
840 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
841 end
842
843 defp restrict_type(query, _), do: query
844
845 defp restrict_state(query, %{"state" => state}) do
846 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
847 end
848
849 defp restrict_state(query, _), do: query
850
851 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
852 from(
853 [_activity, object] in query,
854 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
855 )
856 end
857
858 defp restrict_favorited_by(query, _), do: query
859
860 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
861 raise "Can't use the child object without preloading!"
862 end
863
864 defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do
865 from(
866 [_activity, object] in query,
867 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
868 )
869 end
870
871 defp restrict_media(query, _), do: query
872
873 defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do
874 from(
875 [_activity, object] in query,
876 where: fragment("?->>'inReplyTo' is null", object.data)
877 )
878 end
879
880 defp restrict_replies(query, %{
881 "reply_filtering_user" => user,
882 "reply_visibility" => "self"
883 }) do
884 from(
885 [activity, object] in query,
886 where:
887 fragment(
888 "?->>'inReplyTo' is null OR ? = ANY(?)",
889 object.data,
890 ^user.ap_id,
891 activity.recipients
892 )
893 )
894 end
895
896 defp restrict_replies(query, %{
897 "reply_filtering_user" => user,
898 "reply_visibility" => "following"
899 }) do
900 from(
901 [activity, object] in query,
902 where:
903 fragment(
904 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
905 object.data,
906 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
907 activity.recipients,
908 activity.actor,
909 activity.actor,
910 ^user.ap_id
911 )
912 )
913 end
914
915 defp restrict_replies(query, _), do: query
916
917 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do
918 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
919 end
920
921 defp restrict_reblogs(query, _), do: query
922
923 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
924
925 defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do
926 mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user)
927
928 query =
929 from([activity] in query,
930 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
931 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
932 )
933
934 unless opts["skip_preload"] do
935 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
936 else
937 query
938 end
939 end
940
941 defp restrict_muted(query, _), do: query
942
943 defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
944 blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user)
945 domain_blocks = user.domain_blocks || []
946
947 following_ap_ids = User.get_friends_ap_ids(user)
948
949 query =
950 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
951
952 from(
953 [activity, object: o] in query,
954 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
955 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
956 where:
957 fragment(
958 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
959 activity.data,
960 activity.data,
961 ^blocked_ap_ids
962 ),
963 where:
964 fragment(
965 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
966 activity.actor,
967 ^domain_blocks,
968 activity.actor,
969 ^following_ap_ids
970 ),
971 where:
972 fragment(
973 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
974 o.data,
975 ^domain_blocks,
976 o.data,
977 ^following_ap_ids
978 )
979 )
980 end
981
982 defp restrict_blocked(query, _), do: query
983
984 defp restrict_unlisted(query) do
985 from(
986 activity in query,
987 where:
988 fragment(
989 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
990 activity.data,
991 ^[Constants.as_public()]
992 )
993 )
994 end
995
996 # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only,
997 # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2'
998 # and `restrict_muted/2`
999
1000 defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids})
1001 when pinned in [true, "true", "1"] do
1002 from(activity in query, where: activity.id in ^ids)
1003 end
1004
1005 defp restrict_pinned(query, _), do: query
1006
1007 defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do
1008 muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user)
1009
1010 from(
1011 activity in query,
1012 where:
1013 fragment(
1014 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1015 activity.data,
1016 activity.actor,
1017 ^muted_reblogs
1018 )
1019 )
1020 end
1021
1022 defp restrict_muted_reblogs(query, _), do: query
1023
1024 defp restrict_instance(query, %{"instance" => instance}) do
1025 users =
1026 from(
1027 u in User,
1028 select: u.ap_id,
1029 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
1030 )
1031 |> Repo.all()
1032
1033 from(activity in query, where: activity.actor in ^users)
1034 end
1035
1036 defp restrict_instance(query, _), do: query
1037
1038 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
1039
1040 defp exclude_poll_votes(query, _) do
1041 if has_named_binding?(query, :object) do
1042 from([activity, object: o] in query,
1043 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1044 )
1045 else
1046 query
1047 end
1048 end
1049
1050 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
1051 from(activity in query, where: activity.id != ^id)
1052 end
1053
1054 defp exclude_id(query, _), do: query
1055
1056 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
1057
1058 defp maybe_preload_objects(query, _) do
1059 query
1060 |> Activity.with_preloaded_object()
1061 end
1062
1063 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
1064
1065 defp maybe_preload_bookmarks(query, opts) do
1066 query
1067 |> Activity.with_preloaded_bookmark(opts["user"])
1068 end
1069
1070 defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do
1071 query
1072 |> Activity.with_preloaded_report_notes()
1073 end
1074
1075 defp maybe_preload_report_notes(query, _), do: query
1076
1077 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
1078
1079 defp maybe_set_thread_muted_field(query, opts) do
1080 query
1081 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
1082 end
1083
1084 defp maybe_order(query, %{order: :desc}) do
1085 query
1086 |> order_by(desc: :id)
1087 end
1088
1089 defp maybe_order(query, %{order: :asc}) do
1090 query
1091 |> order_by(asc: :id)
1092 end
1093
1094 defp maybe_order(query, _), do: query
1095
1096 defp fetch_activities_query_ap_ids_ops(opts) do
1097 source_user = opts["muting_user"]
1098 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1099
1100 ap_id_relationships =
1101 ap_id_relationships ++
1102 if opts["blocking_user"] && opts["blocking_user"] == source_user do
1103 [:block]
1104 else
1105 []
1106 end
1107
1108 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1109
1110 restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts)
1111 restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts)
1112
1113 restrict_muted_reblogs_opts =
1114 Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts)
1115
1116 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1117 end
1118
1119 def fetch_activities_query(recipients, opts \\ %{}) do
1120 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1121 fetch_activities_query_ap_ids_ops(opts)
1122
1123 config = %{
1124 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1125 }
1126
1127 Activity
1128 |> maybe_preload_objects(opts)
1129 |> maybe_preload_bookmarks(opts)
1130 |> maybe_preload_report_notes(opts)
1131 |> maybe_set_thread_muted_field(opts)
1132 |> maybe_order(opts)
1133 |> restrict_recipients(recipients, opts["user"])
1134 |> restrict_replies(opts)
1135 |> restrict_tag(opts)
1136 |> restrict_tag_reject(opts)
1137 |> restrict_tag_all(opts)
1138 |> restrict_since(opts)
1139 |> restrict_local(opts)
1140 |> restrict_actor(opts)
1141 |> restrict_type(opts)
1142 |> restrict_state(opts)
1143 |> restrict_favorited_by(opts)
1144 |> restrict_blocked(restrict_blocked_opts)
1145 |> restrict_muted(restrict_muted_opts)
1146 |> restrict_media(opts)
1147 |> restrict_visibility(opts)
1148 |> restrict_thread_visibility(opts, config)
1149 |> restrict_reblogs(opts)
1150 |> restrict_pinned(opts)
1151 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1152 |> restrict_instance(opts)
1153 |> Activity.restrict_deactivated_users()
1154 |> exclude_poll_votes(opts)
1155 |> exclude_visibility(opts)
1156 end
1157
1158 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1159 list_memberships = Pleroma.List.memberships(opts["user"])
1160
1161 fetch_activities_query(recipients ++ list_memberships, opts)
1162 |> Pagination.fetch_paginated(opts, pagination)
1163 |> Enum.reverse()
1164 |> maybe_update_cc(list_memberships, opts["user"])
1165 end
1166
1167 @doc """
1168 Fetch favorites activities of user with order by sort adds to favorites
1169 """
1170 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1171 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1172 user.ap_id
1173 |> Activity.Queries.by_actor()
1174 |> Activity.Queries.by_type("Like")
1175 |> Activity.with_joined_object()
1176 |> Object.with_joined_activity()
1177 |> select([_like, object, activity], %{activity | object: object})
1178 |> order_by([like, _, _], desc: like.id)
1179 |> Pagination.fetch_paginated(
1180 Map.merge(params, %{"skip_order" => true}),
1181 pagination,
1182 :object_activity
1183 )
1184 end
1185
1186 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
1187 when is_list(list_memberships) and length(list_memberships) > 0 do
1188 Enum.map(activities, fn
1189 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
1190 if Enum.any?(bcc, &(&1 in list_memberships)) do
1191 update_in(activity.data["cc"], &[user_ap_id | &1])
1192 else
1193 activity
1194 end
1195
1196 activity ->
1197 activity
1198 end)
1199 end
1200
1201 defp maybe_update_cc(activities, _, _), do: activities
1202
1203 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1204 from(activity in query,
1205 where:
1206 fragment("? && ?", activity.recipients, ^recipients) or
1207 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1208 ^Constants.as_public() in activity.recipients)
1209 )
1210 end
1211
1212 def fetch_activities_bounded(
1213 recipients,
1214 recipients_with_public,
1215 opts \\ %{},
1216 pagination \\ :keyset
1217 ) do
1218 fetch_activities_query([], opts)
1219 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1220 |> Pagination.fetch_paginated(opts, pagination)
1221 |> Enum.reverse()
1222 end
1223
1224 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1225 def upload(file, opts \\ []) do
1226 with {:ok, data} <- Upload.store(file, opts) do
1227 obj_data =
1228 if opts[:actor] do
1229 Map.put(data, "actor", opts[:actor])
1230 else
1231 data
1232 end
1233
1234 Repo.insert(%Object{data: obj_data})
1235 end
1236 end
1237
1238 @spec get_actor_url(any()) :: binary() | nil
1239 defp get_actor_url(url) when is_binary(url), do: url
1240 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1241
1242 defp get_actor_url(url) when is_list(url) do
1243 url
1244 |> List.first()
1245 |> get_actor_url()
1246 end
1247
1248 defp get_actor_url(_url), do: nil
1249
1250 defp object_to_user_data(data) do
1251 avatar =
1252 data["icon"]["url"] &&
1253 %{
1254 "type" => "Image",
1255 "url" => [%{"href" => data["icon"]["url"]}]
1256 }
1257
1258 banner =
1259 data["image"]["url"] &&
1260 %{
1261 "type" => "Image",
1262 "url" => [%{"href" => data["image"]["url"]}]
1263 }
1264
1265 fields =
1266 data
1267 |> Map.get("attachment", [])
1268 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1269 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1270
1271 emojis =
1272 data
1273 |> Map.get("tag", [])
1274 |> Enum.filter(fn
1275 %{"type" => "Emoji"} -> true
1276 _ -> false
1277 end)
1278 |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc ->
1279 Map.put(acc, String.trim(name, ":"), url)
1280 end)
1281
1282 locked = data["manuallyApprovesFollowers"] || false
1283 data = Transmogrifier.maybe_fix_user_object(data)
1284 discoverable = data["discoverable"] || false
1285 invisible = data["invisible"] || false
1286 actor_type = data["type"] || "Person"
1287
1288 public_key =
1289 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1290 data["publicKey"]["publicKeyPem"]
1291 else
1292 nil
1293 end
1294
1295 shared_inbox =
1296 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1297 data["endpoints"]["sharedInbox"]
1298 else
1299 nil
1300 end
1301
1302 user_data = %{
1303 ap_id: data["id"],
1304 uri: get_actor_url(data["url"]),
1305 ap_enabled: true,
1306 banner: banner,
1307 fields: fields,
1308 emoji: emojis,
1309 locked: locked,
1310 discoverable: discoverable,
1311 invisible: invisible,
1312 avatar: avatar,
1313 name: data["name"],
1314 follower_address: data["followers"],
1315 following_address: data["following"],
1316 bio: data["summary"],
1317 actor_type: actor_type,
1318 also_known_as: Map.get(data, "alsoKnownAs", []),
1319 public_key: public_key,
1320 inbox: data["inbox"],
1321 shared_inbox: shared_inbox
1322 }
1323
1324 # nickname can be nil because of virtual actors
1325 user_data =
1326 if data["preferredUsername"] do
1327 Map.put(
1328 user_data,
1329 :nickname,
1330 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1331 )
1332 else
1333 Map.put(user_data, :nickname, nil)
1334 end
1335
1336 {:ok, user_data}
1337 end
1338
1339 def fetch_follow_information_for_user(user) do
1340 with {:ok, following_data} <-
1341 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1342 {:ok, hide_follows} <- collection_private(following_data),
1343 {:ok, followers_data} <-
1344 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1345 {:ok, hide_followers} <- collection_private(followers_data) do
1346 {:ok,
1347 %{
1348 hide_follows: hide_follows,
1349 follower_count: normalize_counter(followers_data["totalItems"]),
1350 following_count: normalize_counter(following_data["totalItems"]),
1351 hide_followers: hide_followers
1352 }}
1353 else
1354 {:error, _} = e -> e
1355 e -> {:error, e}
1356 end
1357 end
1358
1359 defp normalize_counter(counter) when is_integer(counter), do: counter
1360 defp normalize_counter(_), do: 0
1361
1362 def maybe_update_follow_information(user_data) do
1363 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1364 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1365 {_, true} <-
1366 {:collections_available,
1367 !!(user_data[:following_address] && user_data[:follower_address])},
1368 {:ok, info} <-
1369 fetch_follow_information_for_user(user_data) do
1370 info = Map.merge(user_data[:info] || %{}, info)
1371
1372 user_data
1373 |> Map.put(:info, info)
1374 else
1375 {:user_type_check, false} ->
1376 user_data
1377
1378 {:collections_available, false} ->
1379 user_data
1380
1381 {:enabled, false} ->
1382 user_data
1383
1384 e ->
1385 Logger.error(
1386 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1387 )
1388
1389 user_data
1390 end
1391 end
1392
1393 defp collection_private(%{"first" => %{"type" => type}})
1394 when type in ["CollectionPage", "OrderedCollectionPage"],
1395 do: {:ok, false}
1396
1397 defp collection_private(%{"first" => first}) do
1398 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1399 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1400 {:ok, false}
1401 else
1402 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1403 {:error, _} = e -> e
1404 e -> {:error, e}
1405 end
1406 end
1407
1408 defp collection_private(_data), do: {:ok, true}
1409
1410 def user_data_from_user_object(data) do
1411 with {:ok, data} <- MRF.filter(data),
1412 {:ok, data} <- object_to_user_data(data) do
1413 {:ok, data}
1414 else
1415 e -> {:error, e}
1416 end
1417 end
1418
1419 def fetch_and_prepare_user_from_ap_id(ap_id) do
1420 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1421 {:ok, data} <- user_data_from_user_object(data),
1422 data <- maybe_update_follow_information(data) do
1423 {:ok, data}
1424 else
1425 {:error, "Object has been deleted"} = e ->
1426 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1427 {:error, e}
1428
1429 e ->
1430 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1431 {:error, e}
1432 end
1433 end
1434
1435 def make_user_from_ap_id(ap_id) do
1436 user = User.get_cached_by_ap_id(ap_id)
1437
1438 if user && !User.ap_enabled?(user) do
1439 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1440 else
1441 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1442 if user do
1443 user
1444 |> User.remote_user_changeset(data)
1445 |> User.update_and_set_cache()
1446 else
1447 data
1448 |> User.remote_user_changeset()
1449 |> Repo.insert()
1450 |> User.set_cache()
1451 end
1452 else
1453 e -> {:error, e}
1454 end
1455 end
1456 end
1457
1458 def make_user_from_nickname(nickname) do
1459 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1460 make_user_from_ap_id(ap_id)
1461 else
1462 _e -> {:error, "No AP id in WebFinger"}
1463 end
1464 end
1465
1466 # filter out broken threads
1467 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1468 entire_thread_visible_for_user?(activity, user)
1469 end
1470
1471 # do post-processing on a specific activity
1472 def contain_activity(%Activity{} = activity, %User{} = user) do
1473 contain_broken_threads(activity, user)
1474 end
1475
1476 def fetch_direct_messages_query do
1477 Activity
1478 |> restrict_type(%{"type" => "Create"})
1479 |> restrict_visibility(%{visibility: "direct"})
1480 |> order_by([activity], asc: activity.id)
1481 end
1482 end