Add support for actor icon being a list (Bridgy)
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
383 superuser
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
386 end)
387
388 {:ok, activity}
389 else
390 {:error, error} -> Repo.rollback(error)
391 end
392 end
393
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
396 params = %{
397 "type" => "Move",
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id
401 }
402
403 with true <- origin.ap_id in target.also_known_as,
404 {:ok, activity} <- insert(params, local),
405 _ <- notify_and_stream(activity) do
406 maybe_federate(activity)
407
408 BackgroundWorker.enqueue("move_following", %{
409 "origin_id" => origin.id,
410 "target_id" => target.id
411 })
412
413 {:ok, activity}
414 else
415 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
416 err -> err
417 end
418 end
419
420 def fetch_activities_for_context_query(context, opts) do
421 public = [Constants.as_public()]
422
423 recipients =
424 if opts[:user],
425 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
426 else: public
427
428 from(activity in Activity)
429 |> maybe_preload_objects(opts)
430 |> maybe_preload_bookmarks(opts)
431 |> maybe_set_thread_muted_field(opts)
432 |> restrict_blocked(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
435 |> where(
436 [activity],
437 fragment(
438 "?->>'type' = ? and ?->>'context' = ?",
439 activity.data,
440 "Create",
441 activity.data,
442 ^context
443 )
444 )
445 |> exclude_poll_votes(opts)
446 |> exclude_id(opts)
447 |> order_by([activity], desc: activity.id)
448 end
449
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
452 context
453 |> fetch_activities_for_context_query(opts)
454 |> Repo.all()
455 end
456
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 context
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
463 |> limit(1)
464 |> select([a], a.id)
465 |> Repo.one()
466 end
467
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
471
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
476 end
477
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 opts
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
483 end
484
485 @valid_visibilities ~w[direct unlisted public private]
486
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
490 from(
491 a in query,
492 where:
493 fragment(
494 "activity_visibility(?, ?, ?) = ANY (?)",
495 a.actor,
496 a.recipients,
497 a.data,
498 ^visibility
499 )
500 )
501 else
502 Logger.error("Could not restrict visibility to #{visibility}")
503 end
504 end
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
508 from(
509 a in query,
510 where:
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
512 )
513 end
514
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
518 end
519
520 defp restrict_visibility(query, _visibility), do: query
521
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
525 from(
526 a in query,
527 where:
528 not fragment(
529 "activity_visibility(?, ?, ?) = ANY (?)",
530 a.actor,
531 a.recipients,
532 a.data,
533 ^visibility
534 )
535 )
536 else
537 Logger.error("Could not exclude visibility to #{visibility}")
538 query
539 end
540 end
541
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ?",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 end
556
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
560 query
561 end
562
563 defp exclude_visibility(query, _visibility), do: query
564
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
566 do: query
567
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
569 do: query
570
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
572 from(
573 a in query,
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
575 )
576 end
577
578 defp restrict_thread_visibility(query, _, _), do: query
579
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
581 params =
582 params
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
585
586 %{
587 godmode: params[:godmode],
588 reading_user: reading_user
589 }
590 |> user_activities_recipients()
591 |> fetch_activities(params)
592 |> Enum.reverse()
593 end
594
595 def fetch_user_activities(user, reading_user, params \\ %{})
596
597 def fetch_user_activities(user, reading_user, %{total: true} = params) do
598 result = fetch_activities_for_user(user, reading_user, params)
599
600 Keyword.put(result, :items, Enum.reverse(result[:items]))
601 end
602
603 def fetch_user_activities(user, reading_user, params) do
604 user
605 |> fetch_activities_for_user(reading_user, params)
606 |> Enum.reverse()
607 end
608
609 defp fetch_activities_for_user(user, reading_user, params) do
610 params =
611 params
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
616
617 params =
618 if User.blocks?(reading_user, user) do
619 params
620 else
621 params
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
624 end
625
626 pagination_type = Map.get(params, :pagination_type) || :keyset
627
628 %{
629 godmode: params[:godmode],
630 reading_user: reading_user
631 }
632 |> user_activities_recipients()
633 |> fetch_activities(params, pagination_type)
634 end
635
636 def fetch_statuses(reading_user, %{total: true} = params) do
637 result = fetch_activities_for_reading_user(reading_user, params)
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
639 end
640
641 def fetch_statuses(reading_user, params) do
642 reading_user
643 |> fetch_activities_for_reading_user(params)
644 |> Enum.reverse()
645 end
646
647 defp fetch_activities_for_reading_user(reading_user, params) do
648 params = Map.put(params, :type, ["Create", "Announce"])
649
650 %{
651 godmode: params[:godmode],
652 reading_user: reading_user
653 }
654 |> user_activities_recipients()
655 |> fetch_activities(params, :offset)
656 end
657
658 defp user_activities_recipients(%{godmode: true}), do: []
659
660 defp user_activities_recipients(%{reading_user: reading_user}) do
661 if reading_user do
662 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
663 else
664 [Constants.as_public()]
665 end
666 end
667
668 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
669 raise "Can't use the child object without preloading!"
670 end
671
672 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
673 from(
674 [activity, object] in query,
675 where:
676 fragment(
677 "?->>'type' != ? or ?->>'actor' != ?",
678 activity.data,
679 "Announce",
680 object.data,
681 ^actor
682 )
683 )
684 end
685
686 defp restrict_announce_object_actor(query, _), do: query
687
688 defp restrict_since(query, %{since_id: ""}), do: query
689
690 defp restrict_since(query, %{since_id: since_id}) do
691 from(activity in query, where: activity.id > ^since_id)
692 end
693
694 defp restrict_since(query, _), do: query
695
696 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
697 raise "Can't use the child object without preloading!"
698 end
699
700 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
701 from(
702 [_activity, object] in query,
703 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
704 )
705 end
706
707 defp restrict_tag_reject(query, _), do: query
708
709 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
710 raise "Can't use the child object without preloading!"
711 end
712
713 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
714 from(
715 [_activity, object] in query,
716 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
717 )
718 end
719
720 defp restrict_tag_all(query, _), do: query
721
722 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
723 raise "Can't use the child object without preloading!"
724 end
725
726 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
727 from(
728 [_activity, object] in query,
729 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
730 )
731 end
732
733 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
734 from(
735 [_activity, object] in query,
736 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
737 )
738 end
739
740 defp restrict_tag(query, _), do: query
741
742 defp restrict_recipients(query, [], _user), do: query
743
744 defp restrict_recipients(query, recipients, nil) do
745 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
746 end
747
748 defp restrict_recipients(query, recipients, user) do
749 from(
750 activity in query,
751 where: fragment("? && ?", ^recipients, activity.recipients),
752 or_where: activity.actor == ^user.ap_id
753 )
754 end
755
756 defp restrict_local(query, %{local_only: true}) do
757 from(activity in query, where: activity.local == true)
758 end
759
760 defp restrict_local(query, _), do: query
761
762 defp restrict_remote(query, %{remote: true}) do
763 from(activity in query, where: activity.local == false)
764 end
765
766 defp restrict_remote(query, _), do: query
767
768 defp restrict_actor(query, %{actor_id: actor_id}) do
769 from(activity in query, where: activity.actor == ^actor_id)
770 end
771
772 defp restrict_actor(query, _), do: query
773
774 defp restrict_type(query, %{type: type}) when is_binary(type) do
775 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
776 end
777
778 defp restrict_type(query, %{type: type}) do
779 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
780 end
781
782 defp restrict_type(query, _), do: query
783
784 defp restrict_state(query, %{state: state}) do
785 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
786 end
787
788 defp restrict_state(query, _), do: query
789
790 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
791 from(
792 [_activity, object] in query,
793 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
794 )
795 end
796
797 defp restrict_favorited_by(query, _), do: query
798
799 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
800 raise "Can't use the child object without preloading!"
801 end
802
803 defp restrict_media(query, %{only_media: true}) do
804 from(
805 [activity, object] in query,
806 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
807 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
808 )
809 end
810
811 defp restrict_media(query, _), do: query
812
813 defp restrict_replies(query, %{exclude_replies: true}) do
814 from(
815 [_activity, object] in query,
816 where: fragment("?->>'inReplyTo' is null", object.data)
817 )
818 end
819
820 defp restrict_replies(query, %{
821 reply_filtering_user: %User{} = user,
822 reply_visibility: "self"
823 }) do
824 from(
825 [activity, object] in query,
826 where:
827 fragment(
828 "?->>'inReplyTo' is null OR ? = ANY(?)",
829 object.data,
830 ^user.ap_id,
831 activity.recipients
832 )
833 )
834 end
835
836 defp restrict_replies(query, %{
837 reply_filtering_user: %User{} = user,
838 reply_visibility: "following"
839 }) do
840 from(
841 [activity, object] in query,
842 where:
843 fragment(
844 """
845 ?->>'type' != 'Create' -- This isn't a Create
846 OR ?->>'inReplyTo' is null -- this isn't a reply
847 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
848 -- unless they are the author (because authors
849 -- are also part of the recipients). This leads
850 -- to a bug that self-replies by friends won't
851 -- show up.
852 OR ? = ? -- The actor is us
853 """,
854 activity.data,
855 object.data,
856 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
857 activity.recipients,
858 activity.actor,
859 activity.actor,
860 ^user.ap_id
861 )
862 )
863 end
864
865 defp restrict_replies(query, _), do: query
866
867 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
868 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
869 end
870
871 defp restrict_reblogs(query, _), do: query
872
873 defp restrict_muted(query, %{with_muted: true}), do: query
874
875 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
876 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
877
878 query =
879 from([activity] in query,
880 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
881 where:
882 fragment(
883 "not (?->'to' \\?| ?) or ? = ?",
884 activity.data,
885 ^mutes,
886 activity.actor,
887 ^user.ap_id
888 )
889 )
890
891 unless opts[:skip_preload] do
892 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
893 else
894 query
895 end
896 end
897
898 defp restrict_muted(query, _), do: query
899
900 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
901 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
902 domain_blocks = user.domain_blocks || []
903
904 following_ap_ids = User.get_friends_ap_ids(user)
905
906 query =
907 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
908
909 from(
910 [activity, object: o] in query,
911 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
912 where:
913 fragment(
914 "((not (? && ?)) or ? = ?)",
915 activity.recipients,
916 ^blocked_ap_ids,
917 activity.actor,
918 ^user.ap_id
919 ),
920 where:
921 fragment(
922 "recipients_contain_blocked_domains(?, ?) = false",
923 activity.recipients,
924 ^domain_blocks
925 ),
926 where:
927 fragment(
928 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
929 activity.data,
930 activity.data,
931 ^blocked_ap_ids
932 ),
933 where:
934 fragment(
935 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
936 activity.actor,
937 ^domain_blocks,
938 activity.actor,
939 ^following_ap_ids
940 ),
941 where:
942 fragment(
943 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
944 o.data,
945 ^domain_blocks,
946 o.data,
947 ^following_ap_ids
948 )
949 )
950 end
951
952 defp restrict_blocked(query, _), do: query
953
954 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
955 from(
956 activity in query,
957 where:
958 fragment(
959 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
960 activity.data,
961 ^[Constants.as_public()]
962 )
963 )
964 end
965
966 defp restrict_unlisted(query, _), do: query
967
968 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
969 from(activity in query, where: activity.id in ^ids)
970 end
971
972 defp restrict_pinned(query, _), do: query
973
974 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
975 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
976
977 from(
978 activity in query,
979 where:
980 fragment(
981 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
982 activity.data,
983 activity.actor,
984 ^muted_reblogs
985 )
986 )
987 end
988
989 defp restrict_muted_reblogs(query, _), do: query
990
991 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
992 from(
993 activity in query,
994 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
995 )
996 end
997
998 defp restrict_instance(query, _), do: query
999
1000 defp restrict_filtered(query, %{user: %User{} = user}) do
1001 case Filter.compose_regex(user) do
1002 nil ->
1003 query
1004
1005 regex ->
1006 from([activity, object] in query,
1007 where:
1008 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1009 activity.actor == ^user.ap_id
1010 )
1011 end
1012 end
1013
1014 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1015 restrict_filtered(query, %{user: user})
1016 end
1017
1018 defp restrict_filtered(query, _), do: query
1019
1020 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1021
1022 defp exclude_poll_votes(query, _) do
1023 if has_named_binding?(query, :object) do
1024 from([activity, object: o] in query,
1025 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1026 )
1027 else
1028 query
1029 end
1030 end
1031
1032 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1033
1034 defp exclude_chat_messages(query, _) do
1035 if has_named_binding?(query, :object) do
1036 from([activity, object: o] in query,
1037 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1038 )
1039 else
1040 query
1041 end
1042 end
1043
1044 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1045
1046 defp exclude_invisible_actors(query, _opts) do
1047 invisible_ap_ids =
1048 User.Query.build(%{invisible: true, select: [:ap_id]})
1049 |> Repo.all()
1050 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1051
1052 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1053 end
1054
1055 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1056 from(activity in query, where: activity.id != ^id)
1057 end
1058
1059 defp exclude_id(query, _), do: query
1060
1061 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1062
1063 defp maybe_preload_objects(query, _) do
1064 query
1065 |> Activity.with_preloaded_object()
1066 end
1067
1068 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1069
1070 defp maybe_preload_bookmarks(query, opts) do
1071 query
1072 |> Activity.with_preloaded_bookmark(opts[:user])
1073 end
1074
1075 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1076 query
1077 |> Activity.with_preloaded_report_notes()
1078 end
1079
1080 defp maybe_preload_report_notes(query, _), do: query
1081
1082 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1083
1084 defp maybe_set_thread_muted_field(query, opts) do
1085 query
1086 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1087 end
1088
1089 defp maybe_order(query, %{order: :desc}) do
1090 query
1091 |> order_by(desc: :id)
1092 end
1093
1094 defp maybe_order(query, %{order: :asc}) do
1095 query
1096 |> order_by(asc: :id)
1097 end
1098
1099 defp maybe_order(query, _), do: query
1100
1101 defp fetch_activities_query_ap_ids_ops(opts) do
1102 source_user = opts[:muting_user]
1103 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1104
1105 ap_id_relationships =
1106 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1107 [:block | ap_id_relationships]
1108 else
1109 ap_id_relationships
1110 end
1111
1112 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1113
1114 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1115 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1116
1117 restrict_muted_reblogs_opts =
1118 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1119
1120 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1121 end
1122
1123 def fetch_activities_query(recipients, opts \\ %{}) do
1124 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1125 fetch_activities_query_ap_ids_ops(opts)
1126
1127 config = %{
1128 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1129 }
1130
1131 Activity
1132 |> maybe_preload_objects(opts)
1133 |> maybe_preload_bookmarks(opts)
1134 |> maybe_preload_report_notes(opts)
1135 |> maybe_set_thread_muted_field(opts)
1136 |> maybe_order(opts)
1137 |> restrict_recipients(recipients, opts[:user])
1138 |> restrict_replies(opts)
1139 |> restrict_tag(opts)
1140 |> restrict_tag_reject(opts)
1141 |> restrict_tag_all(opts)
1142 |> restrict_since(opts)
1143 |> restrict_local(opts)
1144 |> restrict_remote(opts)
1145 |> restrict_actor(opts)
1146 |> restrict_type(opts)
1147 |> restrict_state(opts)
1148 |> restrict_favorited_by(opts)
1149 |> restrict_blocked(restrict_blocked_opts)
1150 |> restrict_muted(restrict_muted_opts)
1151 |> restrict_filtered(opts)
1152 |> restrict_media(opts)
1153 |> restrict_visibility(opts)
1154 |> restrict_thread_visibility(opts, config)
1155 |> restrict_reblogs(opts)
1156 |> restrict_pinned(opts)
1157 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1158 |> restrict_instance(opts)
1159 |> restrict_announce_object_actor(opts)
1160 |> restrict_filtered(opts)
1161 |> Activity.restrict_deactivated_users()
1162 |> exclude_poll_votes(opts)
1163 |> exclude_chat_messages(opts)
1164 |> exclude_invisible_actors(opts)
1165 |> exclude_visibility(opts)
1166 end
1167
1168 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1169 list_memberships = Pleroma.List.memberships(opts[:user])
1170
1171 fetch_activities_query(recipients ++ list_memberships, opts)
1172 |> Pagination.fetch_paginated(opts, pagination)
1173 |> Enum.reverse()
1174 |> maybe_update_cc(list_memberships, opts[:user])
1175 end
1176
1177 @doc """
1178 Fetch favorites activities of user with order by sort adds to favorites
1179 """
1180 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1181 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1182 user.ap_id
1183 |> Activity.Queries.by_actor()
1184 |> Activity.Queries.by_type("Like")
1185 |> Activity.with_joined_object()
1186 |> Object.with_joined_activity()
1187 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1188 |> order_by([like, _, _], desc_nulls_last: like.id)
1189 |> Pagination.fetch_paginated(
1190 Map.merge(params, %{skip_order: true}),
1191 pagination
1192 )
1193 end
1194
1195 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1196 Enum.map(activities, fn
1197 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1198 if Enum.any?(bcc, &(&1 in list_memberships)) do
1199 update_in(activity.data["cc"], &[user_ap_id | &1])
1200 else
1201 activity
1202 end
1203
1204 activity ->
1205 activity
1206 end)
1207 end
1208
1209 defp maybe_update_cc(activities, _, _), do: activities
1210
1211 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1212 from(activity in query,
1213 where:
1214 fragment("? && ?", activity.recipients, ^recipients) or
1215 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1216 ^Constants.as_public() in activity.recipients)
1217 )
1218 end
1219
1220 def fetch_activities_bounded(
1221 recipients,
1222 recipients_with_public,
1223 opts \\ %{},
1224 pagination \\ :keyset
1225 ) do
1226 fetch_activities_query([], opts)
1227 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1228 |> Pagination.fetch_paginated(opts, pagination)
1229 |> Enum.reverse()
1230 end
1231
1232 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1233 def upload(file, opts \\ []) do
1234 with {:ok, data} <- Upload.store(file, opts) do
1235 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1236
1237 Repo.insert(%Object{data: obj_data})
1238 end
1239 end
1240
1241 @spec get_actor_url(any()) :: binary() | nil
1242 defp get_actor_url(url) when is_binary(url), do: url
1243 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1244
1245 defp get_actor_url(url) when is_list(url) do
1246 url
1247 |> List.first()
1248 |> get_actor_url()
1249 end
1250
1251 defp get_actor_url(_url), do: nil
1252
1253 defp normalize_image(%{"url" => url}) do
1254 %{
1255 "type" => "Image",
1256 "url" => [%{"href" => url}]
1257 }
1258 end
1259
1260 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1261 defp normalize_image(_), do: nil
1262
1263 defp object_to_user_data(data) do
1264 fields =
1265 data
1266 |> Map.get("attachment", [])
1267 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1268 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1269
1270 emojis =
1271 data
1272 |> Map.get("tag", [])
1273 |> Enum.filter(fn
1274 %{"type" => "Emoji"} -> true
1275 _ -> false
1276 end)
1277 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1278 {String.trim(name, ":"), url}
1279 end)
1280
1281 is_locked = data["manuallyApprovesFollowers"] || false
1282 capabilities = data["capabilities"] || %{}
1283 accepts_chat_messages = capabilities["acceptsChatMessages"]
1284 data = Transmogrifier.maybe_fix_user_object(data)
1285 is_discoverable = data["discoverable"] || false
1286 invisible = data["invisible"] || false
1287 actor_type = data["type"] || "Person"
1288
1289 public_key =
1290 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1291 data["publicKey"]["publicKeyPem"]
1292 else
1293 nil
1294 end
1295
1296 shared_inbox =
1297 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1298 data["endpoints"]["sharedInbox"]
1299 else
1300 nil
1301 end
1302
1303 user_data = %{
1304 ap_id: data["id"],
1305 uri: get_actor_url(data["url"]),
1306 ap_enabled: true,
1307 banner: normalize_image(data["image"]),
1308 fields: fields,
1309 emoji: emojis,
1310 is_locked: is_locked,
1311 is_discoverable: is_discoverable,
1312 invisible: invisible,
1313 avatar: normalize_image(data["icon"]),
1314 name: data["name"],
1315 follower_address: data["followers"],
1316 following_address: data["following"],
1317 bio: data["summary"] || "",
1318 actor_type: actor_type,
1319 also_known_as: Map.get(data, "alsoKnownAs", []),
1320 public_key: public_key,
1321 inbox: data["inbox"],
1322 shared_inbox: shared_inbox,
1323 accepts_chat_messages: accepts_chat_messages
1324 }
1325
1326 # nickname can be nil because of virtual actors
1327 if data["preferredUsername"] do
1328 Map.put(
1329 user_data,
1330 :nickname,
1331 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1332 )
1333 else
1334 Map.put(user_data, :nickname, nil)
1335 end
1336 end
1337
1338 def fetch_follow_information_for_user(user) do
1339 with {:ok, following_data} <-
1340 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1341 {:ok, hide_follows} <- collection_private(following_data),
1342 {:ok, followers_data} <-
1343 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1344 {:ok, hide_followers} <- collection_private(followers_data) do
1345 {:ok,
1346 %{
1347 hide_follows: hide_follows,
1348 follower_count: normalize_counter(followers_data["totalItems"]),
1349 following_count: normalize_counter(following_data["totalItems"]),
1350 hide_followers: hide_followers
1351 }}
1352 else
1353 {:error, _} = e -> e
1354 e -> {:error, e}
1355 end
1356 end
1357
1358 defp normalize_counter(counter) when is_integer(counter), do: counter
1359 defp normalize_counter(_), do: 0
1360
1361 def maybe_update_follow_information(user_data) do
1362 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1363 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1364 {_, true} <-
1365 {:collections_available,
1366 !!(user_data[:following_address] && user_data[:follower_address])},
1367 {:ok, info} <-
1368 fetch_follow_information_for_user(user_data) do
1369 info = Map.merge(user_data[:info] || %{}, info)
1370
1371 user_data
1372 |> Map.put(:info, info)
1373 else
1374 {:user_type_check, false} ->
1375 user_data
1376
1377 {:collections_available, false} ->
1378 user_data
1379
1380 {:enabled, false} ->
1381 user_data
1382
1383 e ->
1384 Logger.error(
1385 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1386 )
1387
1388 user_data
1389 end
1390 end
1391
1392 defp collection_private(%{"first" => %{"type" => type}})
1393 when type in ["CollectionPage", "OrderedCollectionPage"],
1394 do: {:ok, false}
1395
1396 defp collection_private(%{"first" => first}) do
1397 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1398 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1399 {:ok, false}
1400 else
1401 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1402 {:error, _} = e -> e
1403 e -> {:error, e}
1404 end
1405 end
1406
1407 defp collection_private(_data), do: {:ok, true}
1408
1409 def user_data_from_user_object(data) do
1410 with {:ok, data} <- MRF.filter(data) do
1411 {:ok, object_to_user_data(data)}
1412 else
1413 e -> {:error, e}
1414 end
1415 end
1416
1417 def fetch_and_prepare_user_from_ap_id(ap_id) do
1418 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1419 {:ok, data} <- user_data_from_user_object(data) do
1420 {:ok, maybe_update_follow_information(data)}
1421 else
1422 # If this has been deleted, only log a debug and not an error
1423 {:error, "Object has been deleted" = e} ->
1424 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1425 {:error, e}
1426
1427 {:error, {:reject, reason} = e} ->
1428 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1429 {:error, e}
1430
1431 {:error, e} ->
1432 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1433 {:error, e}
1434 end
1435 end
1436
1437 def maybe_handle_clashing_nickname(data) do
1438 with nickname when is_binary(nickname) <- data[:nickname],
1439 %User{} = old_user <- User.get_by_nickname(nickname),
1440 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1441 Logger.info(
1442 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1443 data[:ap_id]
1444 }, renaming."
1445 )
1446
1447 old_user
1448 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1449 |> User.update_and_set_cache()
1450 else
1451 {:ap_id_comparison, true} ->
1452 Logger.info(
1453 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1454 )
1455
1456 _ ->
1457 nil
1458 end
1459 end
1460
1461 def make_user_from_ap_id(ap_id) do
1462 user = User.get_cached_by_ap_id(ap_id)
1463
1464 if user && !User.ap_enabled?(user) do
1465 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1466 else
1467 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1468 if user do
1469 user
1470 |> User.remote_user_changeset(data)
1471 |> User.update_and_set_cache()
1472 else
1473 maybe_handle_clashing_nickname(data)
1474
1475 data
1476 |> User.remote_user_changeset()
1477 |> Repo.insert()
1478 |> User.set_cache()
1479 end
1480 end
1481 end
1482 end
1483
1484 def make_user_from_nickname(nickname) do
1485 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1486 make_user_from_ap_id(ap_id)
1487 else
1488 _e -> {:error, "No AP id in WebFinger"}
1489 end
1490 end
1491
1492 # filter out broken threads
1493 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1494 entire_thread_visible_for_user?(activity, user)
1495 end
1496
1497 # do post-processing on a specific activity
1498 def contain_activity(%Activity{} = activity, %User{} = user) do
1499 contain_broken_threads(activity, user)
1500 end
1501
1502 def fetch_direct_messages_query do
1503 Activity
1504 |> restrict_type(%{type: "Create"})
1505 |> restrict_visibility(%{visibility: "direct"})
1506 |> order_by([activity], asc: activity.id)
1507 end
1508 end