5b45e2ca1dca30c5488b53c4477d73958c25aeb3
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
383 superuser
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
386 end)
387
388 {:ok, activity}
389 else
390 {:error, error} -> Repo.rollback(error)
391 end
392 end
393
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
396 params = %{
397 "type" => "Move",
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id
401 }
402
403 with true <- origin.ap_id in target.also_known_as,
404 {:ok, activity} <- insert(params, local),
405 _ <- notify_and_stream(activity) do
406 maybe_federate(activity)
407
408 BackgroundWorker.enqueue("move_following", %{
409 "origin_id" => origin.id,
410 "target_id" => target.id
411 })
412
413 {:ok, activity}
414 else
415 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
416 err -> err
417 end
418 end
419
420 def fetch_activities_for_context_query(context, opts) do
421 public = [Constants.as_public()]
422
423 recipients =
424 if opts[:user],
425 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
426 else: public
427
428 from(activity in Activity)
429 |> maybe_preload_objects(opts)
430 |> maybe_preload_bookmarks(opts)
431 |> maybe_set_thread_muted_field(opts)
432 |> restrict_blocked(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
435 |> where(
436 [activity],
437 fragment(
438 "?->>'type' = ? and ?->>'context' = ?",
439 activity.data,
440 "Create",
441 activity.data,
442 ^context
443 )
444 )
445 |> exclude_poll_votes(opts)
446 |> exclude_id(opts)
447 |> order_by([activity], desc: activity.id)
448 end
449
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
452 context
453 |> fetch_activities_for_context_query(opts)
454 |> Repo.all()
455 end
456
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 context
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
463 |> limit(1)
464 |> select([a], a.id)
465 |> Repo.one()
466 end
467
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
471
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
476 end
477
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 opts
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
483 end
484
485 @valid_visibilities ~w[direct unlisted public private]
486
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
490 from(
491 a in query,
492 where:
493 fragment(
494 "activity_visibility(?, ?, ?) = ANY (?)",
495 a.actor,
496 a.recipients,
497 a.data,
498 ^visibility
499 )
500 )
501 else
502 Logger.error("Could not restrict visibility to #{visibility}")
503 end
504 end
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
508 from(
509 a in query,
510 where:
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
512 )
513 end
514
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
518 end
519
520 defp restrict_visibility(query, _visibility), do: query
521
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
525 from(
526 a in query,
527 where:
528 not fragment(
529 "activity_visibility(?, ?, ?) = ANY (?)",
530 a.actor,
531 a.recipients,
532 a.data,
533 ^visibility
534 )
535 )
536 else
537 Logger.error("Could not exclude visibility to #{visibility}")
538 query
539 end
540 end
541
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ?",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 end
556
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
560 query
561 end
562
563 defp exclude_visibility(query, _visibility), do: query
564
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
566 do: query
567
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
569 do: query
570
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
572 from(
573 a in query,
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
575 )
576 end
577
578 defp restrict_thread_visibility(query, _, _), do: query
579
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
581 params =
582 params
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
585
586 %{
587 godmode: params[:godmode],
588 reading_user: reading_user
589 }
590 |> user_activities_recipients()
591 |> fetch_activities(params)
592 |> Enum.reverse()
593 end
594
595 def fetch_user_activities(user, reading_user, params \\ %{})
596
597 def fetch_user_activities(user, reading_user, %{total: true} = params) do
598 result = fetch_activities_for_user(user, reading_user, params)
599
600 Keyword.put(result, :items, Enum.reverse(result[:items]))
601 end
602
603 def fetch_user_activities(user, reading_user, params) do
604 user
605 |> fetch_activities_for_user(reading_user, params)
606 |> Enum.reverse()
607 end
608
609 defp fetch_activities_for_user(user, reading_user, params) do
610 params =
611 params
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
616
617 params =
618 if User.blocks?(reading_user, user) do
619 params
620 else
621 params
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
624 end
625
626 pagination_type = Map.get(params, :pagination_type) || :keyset
627
628 %{
629 godmode: params[:godmode],
630 reading_user: reading_user
631 }
632 |> user_activities_recipients()
633 |> fetch_activities(params, pagination_type)
634 end
635
636 def fetch_statuses(reading_user, %{total: true} = params) do
637 result = fetch_activities_for_reading_user(reading_user, params)
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
639 end
640
641 def fetch_statuses(reading_user, params) do
642 reading_user
643 |> fetch_activities_for_reading_user(params)
644 |> Enum.reverse()
645 end
646
647 defp fetch_activities_for_reading_user(reading_user, params) do
648 params = Map.put(params, :type, ["Create", "Announce"])
649
650 %{
651 godmode: params[:godmode],
652 reading_user: reading_user
653 }
654 |> user_activities_recipients()
655 |> fetch_activities(params, :offset)
656 end
657
658 defp user_activities_recipients(%{godmode: true}), do: []
659
660 defp user_activities_recipients(%{reading_user: reading_user}) do
661 if reading_user do
662 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
663 else
664 [Constants.as_public()]
665 end
666 end
667
668 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
669 raise "Can't use the child object without preloading!"
670 end
671
672 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
673 from(
674 [activity, object] in query,
675 where:
676 fragment(
677 "?->>'type' != ? or ?->>'actor' != ?",
678 activity.data,
679 "Announce",
680 object.data,
681 ^actor
682 )
683 )
684 end
685
686 defp restrict_announce_object_actor(query, _), do: query
687
688 defp restrict_since(query, %{since_id: ""}), do: query
689
690 defp restrict_since(query, %{since_id: since_id}) do
691 from(activity in query, where: activity.id > ^since_id)
692 end
693
694 defp restrict_since(query, _), do: query
695
696 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
697 raise "Can't use the child object without preloading!"
698 end
699
700 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
701 from(
702 [_activity, object] in query,
703 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
704 )
705 end
706
707 defp restrict_tag_reject(query, _), do: query
708
709 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
710 raise "Can't use the child object without preloading!"
711 end
712
713 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
714 from(
715 [_activity, object] in query,
716 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
717 )
718 end
719
720 defp restrict_tag_all(query, _), do: query
721
722 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
723 raise "Can't use the child object without preloading!"
724 end
725
726 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
727 from(
728 [_activity, object] in query,
729 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
730 )
731 end
732
733 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
734 from(
735 [_activity, object] in query,
736 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
737 )
738 end
739
740 defp restrict_tag(query, _), do: query
741
742 defp restrict_recipients(query, [], _user), do: query
743
744 defp restrict_recipients(query, recipients, nil) do
745 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
746 end
747
748 defp restrict_recipients(query, recipients, user) do
749 from(
750 activity in query,
751 where: fragment("? && ?", ^recipients, activity.recipients),
752 or_where: activity.actor == ^user.ap_id
753 )
754 end
755
756 defp restrict_local(query, %{local_only: true}) do
757 from(activity in query, where: activity.local == true)
758 end
759
760 defp restrict_local(query, _), do: query
761
762 defp restrict_remote(query, %{remote: true}) do
763 from(activity in query, where: activity.local == false)
764 end
765
766 defp restrict_remote(query, _), do: query
767
768 defp restrict_actor(query, %{actor_id: actor_id}) do
769 from(activity in query, where: activity.actor == ^actor_id)
770 end
771
772 defp restrict_actor(query, _), do: query
773
774 defp restrict_type(query, %{type: type}) when is_binary(type) do
775 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
776 end
777
778 defp restrict_type(query, %{type: type}) do
779 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
780 end
781
782 defp restrict_type(query, _), do: query
783
784 defp restrict_state(query, %{state: state}) do
785 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
786 end
787
788 defp restrict_state(query, _), do: query
789
790 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
791 from(
792 [_activity, object] in query,
793 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
794 )
795 end
796
797 defp restrict_favorited_by(query, _), do: query
798
799 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
800 raise "Can't use the child object without preloading!"
801 end
802
803 defp restrict_media(query, %{only_media: true}) do
804 from(
805 [activity, object] in query,
806 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
807 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
808 )
809 end
810
811 defp restrict_media(query, _), do: query
812
813 defp restrict_replies(query, %{exclude_replies: true}) do
814 from(
815 [_activity, object] in query,
816 where: fragment("?->>'inReplyTo' is null", object.data)
817 )
818 end
819
820 defp restrict_replies(query, %{
821 reply_filtering_user: %User{} = user,
822 reply_visibility: "self"
823 }) do
824 from(
825 [activity, object] in query,
826 where:
827 fragment(
828 "?->>'inReplyTo' is null OR ? = ANY(?)",
829 object.data,
830 ^user.ap_id,
831 activity.recipients
832 )
833 )
834 end
835
836 defp restrict_replies(query, %{
837 reply_filtering_user: %User{} = user,
838 reply_visibility: "following"
839 }) do
840 from(
841 [activity, object] in query,
842 where:
843 fragment(
844 """
845 ?->>'type' != 'Create' -- This isn't a Create
846 OR ?->>'inReplyTo' is null -- this isn't a reply
847 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
848 -- unless they are the author (because authors
849 -- are also part of the recipients). This leads
850 -- to a bug that self-replies by friends won't
851 -- show up.
852 OR ? = ? -- The actor is us
853 """,
854 activity.data,
855 object.data,
856 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
857 activity.recipients,
858 activity.actor,
859 activity.actor,
860 ^user.ap_id
861 )
862 )
863 end
864
865 defp restrict_replies(query, _), do: query
866
867 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
868 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
869 end
870
871 defp restrict_reblogs(query, _), do: query
872
873 defp restrict_muted(query, %{with_muted: true}), do: query
874
875 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
876 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
877
878 query =
879 from([activity] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
881 where:
882 fragment(
883 "not (?->'to' \\?| ?) or ? = ?",
884 activity.data,
885 ^mutes,
886 activity.actor,
887 ^user.ap_id
888 )
889 )
890
891 unless opts[:skip_preload] do
892 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
893 else
894 query
895 end
896 end
897
898 defp restrict_muted(query, _), do: query
899
900 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
901 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
902 domain_blocks = user.domain_blocks || []
903
904 following_ap_ids = User.get_friends_ap_ids(user)
905
906 query =
907 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
908
909 from(
910 [activity, object: o] in query,
911 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
912 where:
913 fragment(
914 "((not (? && ?)) or ? = ?)",
915 activity.recipients,
916 ^blocked_ap_ids,
917 activity.actor,
918 ^user.ap_id
919 ),
920 where:
921 fragment(
922 "recipients_contain_blocked_domains(?, ?) = false",
923 activity.recipients,
924 ^domain_blocks
925 ),
926 where:
927 fragment(
928 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
929 activity.data,
930 activity.data,
931 ^blocked_ap_ids
932 ),
933 where:
934 fragment(
935 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
936 activity.actor,
937 ^domain_blocks,
938 activity.actor,
939 ^following_ap_ids
940 ),
941 where:
942 fragment(
943 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
944 o.data,
945 ^domain_blocks,
946 o.data,
947 ^following_ap_ids
948 )
949 )
950 end
951
952 defp restrict_blocked(query, _), do: query
953
954 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
955 from(
956 activity in query,
957 where:
958 fragment(
959 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
960 activity.data,
961 ^[Constants.as_public()]
962 )
963 )
964 end
965
966 defp restrict_unlisted(query, _), do: query
967
968 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
969 from(activity in query, where: activity.id in ^ids)
970 end
971
972 defp restrict_pinned(query, _), do: query
973
974 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
975 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
976
977 from(
978 activity in query,
979 where:
980 fragment(
981 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
982 activity.data,
983 activity.actor,
984 ^muted_reblogs
985 )
986 )
987 end
988
989 defp restrict_muted_reblogs(query, _), do: query
990
991 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
992 from(
993 activity in query,
994 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
995 )
996 end
997
998 defp restrict_instance(query, _), do: query
999
1000 defp restrict_filtered(query, %{user: %User{} = user}) do
1001 case Filter.compose_regex(user) do
1002 nil ->
1003 query
1004
1005 regex ->
1006 from([activity, object] in query,
1007 where:
1008 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1009 activity.actor == ^user.ap_id
1010 )
1011 end
1012 end
1013
1014 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1015 restrict_filtered(query, %{user: user})
1016 end
1017
1018 defp restrict_filtered(query, _), do: query
1019
1020 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1021
1022 defp exclude_poll_votes(query, _) do
1023 if has_named_binding?(query, :object) do
1024 from([activity, object: o] in query,
1025 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1026 )
1027 else
1028 query
1029 end
1030 end
1031
1032 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1033
1034 defp exclude_chat_messages(query, _) do
1035 if has_named_binding?(query, :object) do
1036 from([activity, object: o] in query,
1037 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1038 )
1039 else
1040 query
1041 end
1042 end
1043
1044 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1045
1046 defp exclude_invisible_actors(query, _opts) do
1047 invisible_ap_ids =
1048 User.Query.build(%{invisible: true, select: [:ap_id]})
1049 |> Repo.all()
1050 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1051
1052 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1053 end
1054
1055 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1056 from(activity in query, where: activity.id != ^id)
1057 end
1058
1059 defp exclude_id(query, _), do: query
1060
1061 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1062
1063 defp maybe_preload_objects(query, _) do
1064 query
1065 |> Activity.with_preloaded_object()
1066 end
1067
1068 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1069
1070 defp maybe_preload_bookmarks(query, opts) do
1071 query
1072 |> Activity.with_preloaded_bookmark(opts[:user])
1073 end
1074
1075 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1076 query
1077 |> Activity.with_preloaded_report_notes()
1078 end
1079
1080 defp maybe_preload_report_notes(query, _), do: query
1081
1082 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1083
1084 defp maybe_set_thread_muted_field(query, opts) do
1085 query
1086 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1087 end
1088
1089 defp maybe_order(query, %{order: :desc}) do
1090 query
1091 |> order_by(desc: :id)
1092 end
1093
1094 defp maybe_order(query, %{order: :asc}) do
1095 query
1096 |> order_by(asc: :id)
1097 end
1098
1099 defp maybe_order(query, _), do: query
1100
1101 defp fetch_activities_query_ap_ids_ops(opts) do
1102 source_user = opts[:muting_user]
1103 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1104
1105 ap_id_relationships =
1106 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1107 [:block | ap_id_relationships]
1108 else
1109 ap_id_relationships
1110 end
1111
1112 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1113
1114 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1115 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1116
1117 restrict_muted_reblogs_opts =
1118 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1119
1120 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1121 end
1122
1123 def fetch_activities_query(recipients, opts \\ %{}) do
1124 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1125 fetch_activities_query_ap_ids_ops(opts)
1126
1127 config = %{
1128 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1129 }
1130
1131 Activity
1132 |> maybe_preload_objects(opts)
1133 |> maybe_preload_bookmarks(opts)
1134 |> maybe_preload_report_notes(opts)
1135 |> maybe_set_thread_muted_field(opts)
1136 |> maybe_order(opts)
1137 |> restrict_recipients(recipients, opts[:user])
1138 |> restrict_replies(opts)
1139 |> restrict_tag(opts)
1140 |> restrict_tag_reject(opts)
1141 |> restrict_tag_all(opts)
1142 |> restrict_since(opts)
1143 |> restrict_local(opts)
1144 |> restrict_remote(opts)
1145 |> restrict_actor(opts)
1146 |> restrict_type(opts)
1147 |> restrict_state(opts)
1148 |> restrict_favorited_by(opts)
1149 |> restrict_blocked(restrict_blocked_opts)
1150 |> restrict_muted(restrict_muted_opts)
1151 |> restrict_filtered(opts)
1152 |> restrict_media(opts)
1153 |> restrict_visibility(opts)
1154 |> restrict_thread_visibility(opts, config)
1155 |> restrict_reblogs(opts)
1156 |> restrict_pinned(opts)
1157 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1158 |> restrict_instance(opts)
1159 |> restrict_announce_object_actor(opts)
1160 |> restrict_filtered(opts)
1161 |> Activity.restrict_deactivated_users()
1162 |> exclude_poll_votes(opts)
1163 |> exclude_chat_messages(opts)
1164 |> exclude_invisible_actors(opts)
1165 |> exclude_visibility(opts)
1166 end
1167
1168 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1169 list_memberships = Pleroma.List.memberships(opts[:user])
1170
1171 fetch_activities_query(recipients ++ list_memberships, opts)
1172 |> Pagination.fetch_paginated(opts, pagination)
1173 |> Enum.reverse()
1174 |> maybe_update_cc(list_memberships, opts[:user])
1175 end
1176
1177 @doc """
1178 Fetch favorites activities of user with order by sort adds to favorites
1179 """
1180 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1181 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1182 user.ap_id
1183 |> Activity.Queries.by_actor()
1184 |> Activity.Queries.by_type("Like")
1185 |> Activity.with_joined_object()
1186 |> Object.with_joined_activity()
1187 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1188 |> order_by([like, _, _], desc_nulls_last: like.id)
1189 |> Pagination.fetch_paginated(
1190 Map.merge(params, %{skip_order: true}),
1191 pagination
1192 )
1193 end
1194
1195 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1196 Enum.map(activities, fn
1197 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1198 if Enum.any?(bcc, &(&1 in list_memberships)) do
1199 update_in(activity.data["cc"], &[user_ap_id | &1])
1200 else
1201 activity
1202 end
1203
1204 activity ->
1205 activity
1206 end)
1207 end
1208
1209 defp maybe_update_cc(activities, _, _), do: activities
1210
1211 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1212 from(activity in query,
1213 where:
1214 fragment("? && ?", activity.recipients, ^recipients) or
1215 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1216 ^Constants.as_public() in activity.recipients)
1217 )
1218 end
1219
1220 def fetch_activities_bounded(
1221 recipients,
1222 recipients_with_public,
1223 opts \\ %{},
1224 pagination \\ :keyset
1225 ) do
1226 fetch_activities_query([], opts)
1227 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1228 |> Pagination.fetch_paginated(opts, pagination)
1229 |> Enum.reverse()
1230 end
1231
1232 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1233 def upload(file, opts \\ []) do
1234 with {:ok, data} <- Upload.store(file, opts) do
1235 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1236
1237 Repo.insert(%Object{data: obj_data})
1238 end
1239 end
1240
1241 @spec get_actor_url(any()) :: binary() | nil
1242 defp get_actor_url(url) when is_binary(url), do: url
1243 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1244
1245 defp get_actor_url(url) when is_list(url) do
1246 url
1247 |> List.first()
1248 |> get_actor_url()
1249 end
1250
1251 defp get_actor_url(_url), do: nil
1252
1253 defp object_to_user_data(data) do
1254 avatar =
1255 data["icon"]["url"] &&
1256 %{
1257 "type" => "Image",
1258 "url" => [%{"href" => data["icon"]["url"]}]
1259 }
1260
1261 banner =
1262 data["image"]["url"] &&
1263 %{
1264 "type" => "Image",
1265 "url" => [%{"href" => data["image"]["url"]}]
1266 }
1267
1268 fields =
1269 data
1270 |> Map.get("attachment", [])
1271 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1272 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1273
1274 emojis =
1275 data
1276 |> Map.get("tag", [])
1277 |> Enum.filter(fn
1278 %{"type" => "Emoji"} -> true
1279 _ -> false
1280 end)
1281 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1282 {String.trim(name, ":"), url}
1283 end)
1284
1285 is_locked = data["manuallyApprovesFollowers"] || false
1286 capabilities = data["capabilities"] || %{}
1287 accepts_chat_messages = capabilities["acceptsChatMessages"]
1288 data = Transmogrifier.maybe_fix_user_object(data)
1289 is_discoverable = data["discoverable"] || false
1290 invisible = data["invisible"] || false
1291 actor_type = data["type"] || "Person"
1292
1293 public_key =
1294 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1295 data["publicKey"]["publicKeyPem"]
1296 else
1297 nil
1298 end
1299
1300 shared_inbox =
1301 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1302 data["endpoints"]["sharedInbox"]
1303 else
1304 nil
1305 end
1306
1307 user_data = %{
1308 ap_id: data["id"],
1309 uri: get_actor_url(data["url"]),
1310 ap_enabled: true,
1311 banner: banner,
1312 fields: fields,
1313 emoji: emojis,
1314 is_locked: is_locked,
1315 is_discoverable: is_discoverable,
1316 invisible: invisible,
1317 avatar: avatar,
1318 name: data["name"],
1319 follower_address: data["followers"],
1320 following_address: data["following"],
1321 bio: data["summary"] || "",
1322 actor_type: actor_type,
1323 also_known_as: Map.get(data, "alsoKnownAs", []),
1324 public_key: public_key,
1325 inbox: data["inbox"],
1326 shared_inbox: shared_inbox,
1327 accepts_chat_messages: accepts_chat_messages
1328 }
1329
1330 # nickname can be nil because of virtual actors
1331 if data["preferredUsername"] do
1332 Map.put(
1333 user_data,
1334 :nickname,
1335 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1336 )
1337 else
1338 Map.put(user_data, :nickname, nil)
1339 end
1340 end
1341
1342 def fetch_follow_information_for_user(user) do
1343 with {:ok, following_data} <-
1344 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1345 {:ok, hide_follows} <- collection_private(following_data),
1346 {:ok, followers_data} <-
1347 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1348 {:ok, hide_followers} <- collection_private(followers_data) do
1349 {:ok,
1350 %{
1351 hide_follows: hide_follows,
1352 follower_count: normalize_counter(followers_data["totalItems"]),
1353 following_count: normalize_counter(following_data["totalItems"]),
1354 hide_followers: hide_followers
1355 }}
1356 else
1357 {:error, _} = e -> e
1358 e -> {:error, e}
1359 end
1360 end
1361
1362 defp normalize_counter(counter) when is_integer(counter), do: counter
1363 defp normalize_counter(_), do: 0
1364
1365 def maybe_update_follow_information(user_data) do
1366 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1367 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1368 {_, true} <-
1369 {:collections_available,
1370 !!(user_data[:following_address] && user_data[:follower_address])},
1371 {:ok, info} <-
1372 fetch_follow_information_for_user(user_data) do
1373 info = Map.merge(user_data[:info] || %{}, info)
1374
1375 user_data
1376 |> Map.put(:info, info)
1377 else
1378 {:user_type_check, false} ->
1379 user_data
1380
1381 {:collections_available, false} ->
1382 user_data
1383
1384 {:enabled, false} ->
1385 user_data
1386
1387 e ->
1388 Logger.error(
1389 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1390 )
1391
1392 user_data
1393 end
1394 end
1395
1396 defp collection_private(%{"first" => %{"type" => type}})
1397 when type in ["CollectionPage", "OrderedCollectionPage"],
1398 do: {:ok, false}
1399
1400 defp collection_private(%{"first" => first}) do
1401 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1402 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1403 {:ok, false}
1404 else
1405 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1406 {:error, _} = e -> e
1407 e -> {:error, e}
1408 end
1409 end
1410
1411 defp collection_private(_data), do: {:ok, true}
1412
1413 def user_data_from_user_object(data) do
1414 with {:ok, data} <- MRF.filter(data) do
1415 {:ok, object_to_user_data(data)}
1416 else
1417 e -> {:error, e}
1418 end
1419 end
1420
1421 def fetch_and_prepare_user_from_ap_id(ap_id) do
1422 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1423 {:ok, data} <- user_data_from_user_object(data) do
1424 {:ok, maybe_update_follow_information(data)}
1425 else
1426 # If this has been deleted, only log a debug and not an error
1427 {:error, "Object has been deleted" = e} ->
1428 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1429 {:error, e}
1430
1431 {:error, {:reject, reason} = e} ->
1432 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1433 {:error, e}
1434
1435 {:error, e} ->
1436 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1437 {:error, e}
1438 end
1439 end
1440
1441 def maybe_handle_clashing_nickname(data) do
1442 with nickname when is_binary(nickname) <- data[:nickname],
1443 %User{} = old_user <- User.get_by_nickname(nickname),
1444 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1445 Logger.info(
1446 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1447 data[:ap_id]
1448 }, renaming."
1449 )
1450
1451 old_user
1452 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1453 |> User.update_and_set_cache()
1454 else
1455 {:ap_id_comparison, true} ->
1456 Logger.info(
1457 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1458 )
1459
1460 _ ->
1461 nil
1462 end
1463 end
1464
1465 def make_user_from_ap_id(ap_id) do
1466 user = User.get_cached_by_ap_id(ap_id)
1467
1468 if user && !User.ap_enabled?(user) do
1469 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1470 else
1471 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1472 if user do
1473 user
1474 |> User.remote_user_changeset(data)
1475 |> User.update_and_set_cache()
1476 else
1477 maybe_handle_clashing_nickname(data)
1478
1479 data
1480 |> User.remote_user_changeset()
1481 |> Repo.insert()
1482 |> User.set_cache()
1483 end
1484 end
1485 end
1486 end
1487
1488 def make_user_from_nickname(nickname) do
1489 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1490 make_user_from_ap_id(ap_id)
1491 else
1492 _e -> {:error, "No AP id in WebFinger"}
1493 end
1494 end
1495
1496 # filter out broken threads
1497 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1498 entire_thread_visible_for_user?(activity, user)
1499 end
1500
1501 # do post-processing on a specific activity
1502 def contain_activity(%Activity{} = activity, %User{} = user) do
1503 contain_broken_threads(activity, user)
1504 end
1505
1506 def fetch_direct_messages_query do
1507 Activity
1508 |> restrict_type(%{type: "Create"})
1509 |> restrict_visibility(%{visibility: "direct"})
1510 |> order_by([activity], asc: activity.id)
1511 end
1512 end