Merge branch 'bugfix/notice-external-redirect' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{})
595
596 def fetch_user_activities(user, reading_user, %{total: true} = params) do
597 result = fetch_activities_for_user(user, reading_user, params)
598
599 Keyword.put(result, :items, Enum.reverse(result[:items]))
600 end
601
602 def fetch_user_activities(user, reading_user, params) do
603 user
604 |> fetch_activities_for_user(reading_user, params)
605 |> Enum.reverse()
606 end
607
608 defp fetch_activities_for_user(user, reading_user, params) do
609 params =
610 params
611 |> Map.put(:type, ["Create", "Announce"])
612 |> Map.put(:user, reading_user)
613 |> Map.put(:actor_id, user.ap_id)
614 |> Map.put(:pinned_activity_ids, user.pinned_activities)
615
616 params =
617 if User.blocks?(reading_user, user) do
618 params
619 else
620 params
621 |> Map.put(:blocking_user, reading_user)
622 |> Map.put(:muting_user, reading_user)
623 end
624
625 pagination_type = Map.get(params, :pagination_type) || :keyset
626
627 %{
628 godmode: params[:godmode],
629 reading_user: reading_user
630 }
631 |> user_activities_recipients()
632 |> fetch_activities(params, pagination_type)
633 end
634
635 def fetch_statuses(reading_user, %{total: true} = params) do
636 result = fetch_activities_for_reading_user(reading_user, params)
637 Keyword.put(result, :items, Enum.reverse(result[:items]))
638 end
639
640 def fetch_statuses(reading_user, params) do
641 reading_user
642 |> fetch_activities_for_reading_user(params)
643 |> Enum.reverse()
644 end
645
646 defp fetch_activities_for_reading_user(reading_user, params) do
647 params = Map.put(params, :type, ["Create", "Announce"])
648
649 %{
650 godmode: params[:godmode],
651 reading_user: reading_user
652 }
653 |> user_activities_recipients()
654 |> fetch_activities(params, :offset)
655 end
656
657 defp user_activities_recipients(%{godmode: true}), do: []
658
659 defp user_activities_recipients(%{reading_user: reading_user}) do
660 if reading_user do
661 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
662 else
663 [Constants.as_public()]
664 end
665 end
666
667 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
668 raise "Can't use the child object without preloading!"
669 end
670
671 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
672 from(
673 [activity, object] in query,
674 where:
675 fragment(
676 "?->>'type' != ? or ?->>'actor' != ?",
677 activity.data,
678 "Announce",
679 object.data,
680 ^actor
681 )
682 )
683 end
684
685 defp restrict_announce_object_actor(query, _), do: query
686
687 defp restrict_since(query, %{since_id: ""}), do: query
688
689 defp restrict_since(query, %{since_id: since_id}) do
690 from(activity in query, where: activity.id > ^since_id)
691 end
692
693 defp restrict_since(query, _), do: query
694
695 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
696 raise "Can't use the child object without preloading!"
697 end
698
699 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
700 from(
701 [_activity, object] in query,
702 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
703 )
704 end
705
706 defp restrict_tag_reject(query, _), do: query
707
708 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
709 raise "Can't use the child object without preloading!"
710 end
711
712 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
713 from(
714 [_activity, object] in query,
715 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
716 )
717 end
718
719 defp restrict_tag_all(query, _), do: query
720
721 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
722 raise "Can't use the child object without preloading!"
723 end
724
725 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
726 from(
727 [_activity, object] in query,
728 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
729 )
730 end
731
732 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
733 from(
734 [_activity, object] in query,
735 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
736 )
737 end
738
739 defp restrict_tag(query, _), do: query
740
741 defp restrict_recipients(query, [], _user), do: query
742
743 defp restrict_recipients(query, recipients, nil) do
744 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
745 end
746
747 defp restrict_recipients(query, recipients, user) do
748 from(
749 activity in query,
750 where: fragment("? && ?", ^recipients, activity.recipients),
751 or_where: activity.actor == ^user.ap_id
752 )
753 end
754
755 defp restrict_local(query, %{local_only: true}) do
756 from(activity in query, where: activity.local == true)
757 end
758
759 defp restrict_local(query, _), do: query
760
761 defp restrict_remote(query, %{remote: true}) do
762 from(activity in query, where: activity.local == false)
763 end
764
765 defp restrict_remote(query, _), do: query
766
767 defp restrict_actor(query, %{actor_id: actor_id}) do
768 from(activity in query, where: activity.actor == ^actor_id)
769 end
770
771 defp restrict_actor(query, _), do: query
772
773 defp restrict_type(query, %{type: type}) when is_binary(type) do
774 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
775 end
776
777 defp restrict_type(query, %{type: type}) do
778 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
779 end
780
781 defp restrict_type(query, _), do: query
782
783 defp restrict_state(query, %{state: state}) do
784 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
785 end
786
787 defp restrict_state(query, _), do: query
788
789 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
790 from(
791 [_activity, object] in query,
792 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
793 )
794 end
795
796 defp restrict_favorited_by(query, _), do: query
797
798 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
799 raise "Can't use the child object without preloading!"
800 end
801
802 defp restrict_media(query, %{only_media: true}) do
803 from(
804 [activity, object] in query,
805 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
806 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
807 )
808 end
809
810 defp restrict_media(query, _), do: query
811
812 defp restrict_replies(query, %{exclude_replies: true}) do
813 from(
814 [_activity, object] in query,
815 where: fragment("?->>'inReplyTo' is null", object.data)
816 )
817 end
818
819 defp restrict_replies(query, %{
820 reply_filtering_user: %User{} = user,
821 reply_visibility: "self"
822 }) do
823 from(
824 [activity, object] in query,
825 where:
826 fragment(
827 "?->>'inReplyTo' is null OR ? = ANY(?)",
828 object.data,
829 ^user.ap_id,
830 activity.recipients
831 )
832 )
833 end
834
835 defp restrict_replies(query, %{
836 reply_filtering_user: %User{} = user,
837 reply_visibility: "following"
838 }) do
839 from(
840 [activity, object] in query,
841 where:
842 fragment(
843 """
844 ?->>'type' != 'Create' -- This isn't a Create
845 OR ?->>'inReplyTo' is null -- this isn't a reply
846 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
847 -- unless they are the author (because authors
848 -- are also part of the recipients). This leads
849 -- to a bug that self-replies by friends won't
850 -- show up.
851 OR ? = ? -- The actor is us
852 """,
853 activity.data,
854 object.data,
855 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
856 activity.recipients,
857 activity.actor,
858 activity.actor,
859 ^user.ap_id
860 )
861 )
862 end
863
864 defp restrict_replies(query, _), do: query
865
866 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
867 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
868 end
869
870 defp restrict_reblogs(query, _), do: query
871
872 defp restrict_muted(query, %{with_muted: true}), do: query
873
874 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
875 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
876
877 query =
878 from([activity] in query,
879 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
880 where:
881 fragment(
882 "not (?->'to' \\?| ?) or ? = ?",
883 activity.data,
884 ^mutes,
885 activity.actor,
886 ^user.ap_id
887 )
888 )
889
890 unless opts[:skip_preload] do
891 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
892 else
893 query
894 end
895 end
896
897 defp restrict_muted(query, _), do: query
898
899 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
900 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
901 domain_blocks = user.domain_blocks || []
902
903 following_ap_ids = User.get_friends_ap_ids(user)
904
905 query =
906 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
907
908 from(
909 [activity, object: o] in query,
910 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
911 where:
912 fragment(
913 "((not (? && ?)) or ? = ?)",
914 activity.recipients,
915 ^blocked_ap_ids,
916 activity.actor,
917 ^user.ap_id
918 ),
919 where:
920 fragment(
921 "recipients_contain_blocked_domains(?, ?) = false",
922 activity.recipients,
923 ^domain_blocks
924 ),
925 where:
926 fragment(
927 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
928 activity.data,
929 activity.data,
930 ^blocked_ap_ids
931 ),
932 where:
933 fragment(
934 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
935 activity.actor,
936 ^domain_blocks,
937 activity.actor,
938 ^following_ap_ids
939 ),
940 where:
941 fragment(
942 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
943 o.data,
944 ^domain_blocks,
945 o.data,
946 ^following_ap_ids
947 )
948 )
949 end
950
951 defp restrict_blocked(query, _), do: query
952
953 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
954 from(
955 activity in query,
956 where:
957 fragment(
958 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
959 activity.data,
960 ^[Constants.as_public()]
961 )
962 )
963 end
964
965 defp restrict_unlisted(query, _), do: query
966
967 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
968 from(activity in query, where: activity.id in ^ids)
969 end
970
971 defp restrict_pinned(query, _), do: query
972
973 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
974 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
975
976 from(
977 activity in query,
978 where:
979 fragment(
980 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
981 activity.data,
982 activity.actor,
983 ^muted_reblogs
984 )
985 )
986 end
987
988 defp restrict_muted_reblogs(query, _), do: query
989
990 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
991 from(
992 activity in query,
993 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
994 )
995 end
996
997 defp restrict_instance(query, _), do: query
998
999 defp restrict_filtered(query, %{user: %User{} = user}) do
1000 case Filter.compose_regex(user) do
1001 nil ->
1002 query
1003
1004 regex ->
1005 from([activity, object] in query,
1006 where:
1007 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1008 activity.actor == ^user.ap_id
1009 )
1010 end
1011 end
1012
1013 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1014 restrict_filtered(query, %{user: user})
1015 end
1016
1017 defp restrict_filtered(query, _), do: query
1018
1019 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1020
1021 defp exclude_poll_votes(query, _) do
1022 if has_named_binding?(query, :object) do
1023 from([activity, object: o] in query,
1024 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1025 )
1026 else
1027 query
1028 end
1029 end
1030
1031 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1032
1033 defp exclude_chat_messages(query, _) do
1034 if has_named_binding?(query, :object) do
1035 from([activity, object: o] in query,
1036 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1037 )
1038 else
1039 query
1040 end
1041 end
1042
1043 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1044
1045 defp exclude_invisible_actors(query, _opts) do
1046 invisible_ap_ids =
1047 User.Query.build(%{invisible: true, select: [:ap_id]})
1048 |> Repo.all()
1049 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1050
1051 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1052 end
1053
1054 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1055 from(activity in query, where: activity.id != ^id)
1056 end
1057
1058 defp exclude_id(query, _), do: query
1059
1060 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1061
1062 defp maybe_preload_objects(query, _) do
1063 query
1064 |> Activity.with_preloaded_object()
1065 end
1066
1067 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1068
1069 defp maybe_preload_bookmarks(query, opts) do
1070 query
1071 |> Activity.with_preloaded_bookmark(opts[:user])
1072 end
1073
1074 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1075 query
1076 |> Activity.with_preloaded_report_notes()
1077 end
1078
1079 defp maybe_preload_report_notes(query, _), do: query
1080
1081 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1082
1083 defp maybe_set_thread_muted_field(query, opts) do
1084 query
1085 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1086 end
1087
1088 defp maybe_order(query, %{order: :desc}) do
1089 query
1090 |> order_by(desc: :id)
1091 end
1092
1093 defp maybe_order(query, %{order: :asc}) do
1094 query
1095 |> order_by(asc: :id)
1096 end
1097
1098 defp maybe_order(query, _), do: query
1099
1100 defp fetch_activities_query_ap_ids_ops(opts) do
1101 source_user = opts[:muting_user]
1102 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1103
1104 ap_id_relationships =
1105 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1106 [:block | ap_id_relationships]
1107 else
1108 ap_id_relationships
1109 end
1110
1111 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1112
1113 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1114 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1115
1116 restrict_muted_reblogs_opts =
1117 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1118
1119 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1120 end
1121
1122 def fetch_activities_query(recipients, opts \\ %{}) do
1123 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1124 fetch_activities_query_ap_ids_ops(opts)
1125
1126 config = %{
1127 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1128 }
1129
1130 Activity
1131 |> maybe_preload_objects(opts)
1132 |> maybe_preload_bookmarks(opts)
1133 |> maybe_preload_report_notes(opts)
1134 |> maybe_set_thread_muted_field(opts)
1135 |> maybe_order(opts)
1136 |> restrict_recipients(recipients, opts[:user])
1137 |> restrict_replies(opts)
1138 |> restrict_tag(opts)
1139 |> restrict_tag_reject(opts)
1140 |> restrict_tag_all(opts)
1141 |> restrict_since(opts)
1142 |> restrict_local(opts)
1143 |> restrict_remote(opts)
1144 |> restrict_actor(opts)
1145 |> restrict_type(opts)
1146 |> restrict_state(opts)
1147 |> restrict_favorited_by(opts)
1148 |> restrict_blocked(restrict_blocked_opts)
1149 |> restrict_muted(restrict_muted_opts)
1150 |> restrict_filtered(opts)
1151 |> restrict_media(opts)
1152 |> restrict_visibility(opts)
1153 |> restrict_thread_visibility(opts, config)
1154 |> restrict_reblogs(opts)
1155 |> restrict_pinned(opts)
1156 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1157 |> restrict_instance(opts)
1158 |> restrict_announce_object_actor(opts)
1159 |> restrict_filtered(opts)
1160 |> Activity.restrict_deactivated_users()
1161 |> exclude_poll_votes(opts)
1162 |> exclude_chat_messages(opts)
1163 |> exclude_invisible_actors(opts)
1164 |> exclude_visibility(opts)
1165 end
1166
1167 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1168 list_memberships = Pleroma.List.memberships(opts[:user])
1169
1170 fetch_activities_query(recipients ++ list_memberships, opts)
1171 |> Pagination.fetch_paginated(opts, pagination)
1172 |> Enum.reverse()
1173 |> maybe_update_cc(list_memberships, opts[:user])
1174 end
1175
1176 @doc """
1177 Fetch favorites activities of user with order by sort adds to favorites
1178 """
1179 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1180 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1181 user.ap_id
1182 |> Activity.Queries.by_actor()
1183 |> Activity.Queries.by_type("Like")
1184 |> Activity.with_joined_object()
1185 |> Object.with_joined_activity()
1186 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1187 |> order_by([like, _, _], desc_nulls_last: like.id)
1188 |> Pagination.fetch_paginated(
1189 Map.merge(params, %{skip_order: true}),
1190 pagination
1191 )
1192 end
1193
1194 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1195 Enum.map(activities, fn
1196 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1197 if Enum.any?(bcc, &(&1 in list_memberships)) do
1198 update_in(activity.data["cc"], &[user_ap_id | &1])
1199 else
1200 activity
1201 end
1202
1203 activity ->
1204 activity
1205 end)
1206 end
1207
1208 defp maybe_update_cc(activities, _, _), do: activities
1209
1210 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1211 from(activity in query,
1212 where:
1213 fragment("? && ?", activity.recipients, ^recipients) or
1214 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1215 ^Constants.as_public() in activity.recipients)
1216 )
1217 end
1218
1219 def fetch_activities_bounded(
1220 recipients,
1221 recipients_with_public,
1222 opts \\ %{},
1223 pagination \\ :keyset
1224 ) do
1225 fetch_activities_query([], opts)
1226 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1227 |> Pagination.fetch_paginated(opts, pagination)
1228 |> Enum.reverse()
1229 end
1230
1231 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1232 def upload(file, opts \\ []) do
1233 with {:ok, data} <- Upload.store(file, opts) do
1234 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1235
1236 Repo.insert(%Object{data: obj_data})
1237 end
1238 end
1239
1240 @spec get_actor_url(any()) :: binary() | nil
1241 defp get_actor_url(url) when is_binary(url), do: url
1242 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1243
1244 defp get_actor_url(url) when is_list(url) do
1245 url
1246 |> List.first()
1247 |> get_actor_url()
1248 end
1249
1250 defp get_actor_url(_url), do: nil
1251
1252 defp object_to_user_data(data) do
1253 avatar =
1254 data["icon"]["url"] &&
1255 %{
1256 "type" => "Image",
1257 "url" => [%{"href" => data["icon"]["url"]}]
1258 }
1259
1260 banner =
1261 data["image"]["url"] &&
1262 %{
1263 "type" => "Image",
1264 "url" => [%{"href" => data["image"]["url"]}]
1265 }
1266
1267 fields =
1268 data
1269 |> Map.get("attachment", [])
1270 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1271 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1272
1273 emojis =
1274 data
1275 |> Map.get("tag", [])
1276 |> Enum.filter(fn
1277 %{"type" => "Emoji"} -> true
1278 _ -> false
1279 end)
1280 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1281 {String.trim(name, ":"), url}
1282 end)
1283
1284 is_locked = data["manuallyApprovesFollowers"] || false
1285 capabilities = data["capabilities"] || %{}
1286 accepts_chat_messages = capabilities["acceptsChatMessages"]
1287 data = Transmogrifier.maybe_fix_user_object(data)
1288 is_discoverable = data["discoverable"] || false
1289 invisible = data["invisible"] || false
1290 actor_type = data["type"] || "Person"
1291
1292 public_key =
1293 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1294 data["publicKey"]["publicKeyPem"]
1295 else
1296 nil
1297 end
1298
1299 shared_inbox =
1300 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1301 data["endpoints"]["sharedInbox"]
1302 else
1303 nil
1304 end
1305
1306 user_data = %{
1307 ap_id: data["id"],
1308 uri: get_actor_url(data["url"]),
1309 ap_enabled: true,
1310 banner: banner,
1311 fields: fields,
1312 emoji: emojis,
1313 is_locked: is_locked,
1314 is_discoverable: is_discoverable,
1315 invisible: invisible,
1316 avatar: avatar,
1317 name: data["name"],
1318 follower_address: data["followers"],
1319 following_address: data["following"],
1320 bio: data["summary"] || "",
1321 actor_type: actor_type,
1322 also_known_as: Map.get(data, "alsoKnownAs", []),
1323 public_key: public_key,
1324 inbox: data["inbox"],
1325 shared_inbox: shared_inbox,
1326 accepts_chat_messages: accepts_chat_messages
1327 }
1328
1329 # nickname can be nil because of virtual actors
1330 if data["preferredUsername"] do
1331 Map.put(
1332 user_data,
1333 :nickname,
1334 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1335 )
1336 else
1337 Map.put(user_data, :nickname, nil)
1338 end
1339 end
1340
1341 def fetch_follow_information_for_user(user) do
1342 with {:ok, following_data} <-
1343 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1344 {:ok, hide_follows} <- collection_private(following_data),
1345 {:ok, followers_data} <-
1346 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1347 {:ok, hide_followers} <- collection_private(followers_data) do
1348 {:ok,
1349 %{
1350 hide_follows: hide_follows,
1351 follower_count: normalize_counter(followers_data["totalItems"]),
1352 following_count: normalize_counter(following_data["totalItems"]),
1353 hide_followers: hide_followers
1354 }}
1355 else
1356 {:error, _} = e -> e
1357 e -> {:error, e}
1358 end
1359 end
1360
1361 defp normalize_counter(counter) when is_integer(counter), do: counter
1362 defp normalize_counter(_), do: 0
1363
1364 def maybe_update_follow_information(user_data) do
1365 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1366 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1367 {_, true} <-
1368 {:collections_available,
1369 !!(user_data[:following_address] && user_data[:follower_address])},
1370 {:ok, info} <-
1371 fetch_follow_information_for_user(user_data) do
1372 info = Map.merge(user_data[:info] || %{}, info)
1373
1374 user_data
1375 |> Map.put(:info, info)
1376 else
1377 {:user_type_check, false} ->
1378 user_data
1379
1380 {:collections_available, false} ->
1381 user_data
1382
1383 {:enabled, false} ->
1384 user_data
1385
1386 e ->
1387 Logger.error(
1388 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1389 )
1390
1391 user_data
1392 end
1393 end
1394
1395 defp collection_private(%{"first" => %{"type" => type}})
1396 when type in ["CollectionPage", "OrderedCollectionPage"],
1397 do: {:ok, false}
1398
1399 defp collection_private(%{"first" => first}) do
1400 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1401 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1402 {:ok, false}
1403 else
1404 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1405 {:error, _} = e -> e
1406 e -> {:error, e}
1407 end
1408 end
1409
1410 defp collection_private(_data), do: {:ok, true}
1411
1412 def user_data_from_user_object(data) do
1413 with {:ok, data} <- MRF.filter(data) do
1414 {:ok, object_to_user_data(data)}
1415 else
1416 e -> {:error, e}
1417 end
1418 end
1419
1420 def fetch_and_prepare_user_from_ap_id(ap_id) do
1421 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1422 {:ok, data} <- user_data_from_user_object(data) do
1423 {:ok, maybe_update_follow_information(data)}
1424 else
1425 # If this has been deleted, only log a debug and not an error
1426 {:error, "Object has been deleted" = e} ->
1427 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1428 {:error, e}
1429
1430 {:error, {:reject, reason} = e} ->
1431 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1432 {:error, e}
1433
1434 {:error, e} ->
1435 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1436 {:error, e}
1437 end
1438 end
1439
1440 def maybe_handle_clashing_nickname(data) do
1441 with nickname when is_binary(nickname) <- data[:nickname],
1442 %User{} = old_user <- User.get_by_nickname(nickname),
1443 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1444 Logger.info(
1445 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1446 data[:ap_id]
1447 }, renaming."
1448 )
1449
1450 old_user
1451 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1452 |> User.update_and_set_cache()
1453 else
1454 {:ap_id_comparison, true} ->
1455 Logger.info(
1456 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1457 )
1458
1459 _ ->
1460 nil
1461 end
1462 end
1463
1464 def make_user_from_ap_id(ap_id) do
1465 user = User.get_cached_by_ap_id(ap_id)
1466
1467 if user && !User.ap_enabled?(user) do
1468 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1469 else
1470 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1471 if user do
1472 user
1473 |> User.remote_user_changeset(data)
1474 |> User.update_and_set_cache()
1475 else
1476 maybe_handle_clashing_nickname(data)
1477
1478 data
1479 |> User.remote_user_changeset()
1480 |> Repo.insert()
1481 |> User.set_cache()
1482 end
1483 end
1484 end
1485 end
1486
1487 def make_user_from_nickname(nickname) do
1488 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1489 make_user_from_ap_id(ap_id)
1490 else
1491 _e -> {:error, "No AP id in WebFinger"}
1492 end
1493 end
1494
1495 # filter out broken threads
1496 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1497 entire_thread_visible_for_user?(activity, user)
1498 end
1499
1500 # do post-processing on a specific activity
1501 def contain_activity(%Activity{} = activity, %User{} = user) do
1502 contain_broken_threads(activity, user)
1503 end
1504
1505 def fetch_direct_messages_query do
1506 Activity
1507 |> restrict_type(%{type: "Create"})
1508 |> restrict_visibility(%{visibility: "direct"})
1509 |> order_by([activity], asc: activity.id)
1510 end
1511 end