Deletions: allow deactivated users to be deleted
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
56 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
57
58 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
61 _ -> false
62 end
63 end
64
65 defp check_actor_can_insert(_), do: true
66
67 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
68 limit = Config.get([:instance, :remote_limit])
69 String.length(content) <= limit
70 end
71
72 defp check_remote_limit(_), do: true
73
74 def increase_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
76 end
77
78 def decrease_note_count_if_public(actor, object) do
79 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
80 end
81
82 defp increase_replies_count_if_reply(%{
83 "object" => %{"inReplyTo" => reply_ap_id} = object,
84 "type" => "Create"
85 }) do
86 if is_public?(object) do
87 Object.increase_replies_count(reply_ap_id)
88 end
89 end
90
91 defp increase_replies_count_if_reply(_create_data), do: :noop
92
93 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
94 @impl true
95 def persist(%{"type" => type} = object, meta) when type in @object_types do
96 with {:ok, object} <- Object.create(object) do
97 {:ok, object, meta}
98 end
99 end
100
101 @impl true
102 def persist(object, meta) do
103 with local <- Keyword.fetch!(meta, :local),
104 {recipients, _, _} <- get_recipients(object),
105 {:ok, activity} <-
106 Repo.insert(%Activity{
107 data: object,
108 local: local,
109 recipients: recipients,
110 actor: object["actor"]
111 }),
112 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
113 {:ok, _} <- maybe_create_activity_expiration(activity) do
114 {:ok, activity, meta}
115 end
116 end
117
118 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
119 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
120 with nil <- Activity.normalize(map),
121 map <- lazy_put_activity_defaults(map, fake),
122 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
123 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
124 {:ok, map} <- MRF.filter(map),
125 {recipients, _, _} = get_recipients(map),
126 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
127 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
128 {:ok, map, object} <- insert_full_object(map),
129 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
130 # Splice in the child object if we have one.
131 activity = Maps.put_if_present(activity, :object, object)
132
133 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
134 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
135 end)
136
137 {:ok, activity}
138 else
139 %Activity{} = activity ->
140 {:ok, activity}
141
142 {:actor_check, _} ->
143 {:error, false}
144
145 {:containment, _} = error ->
146 error
147
148 {:error, _} = error ->
149 error
150
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
153 data: map,
154 local: local,
155 actor: map["actor"],
156 recipients: recipients,
157 id: "pleroma:fakeid"
158 }
159
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:ok, activity}
162
163 {:remote_limit_pass, _} ->
164 {:error, :remote_limit}
165
166 {:reject, _} = e ->
167 {:error, e}
168 end
169 end
170
171 defp insert_activity_with_expiration(data, local, recipients) do
172 struct = %Activity{
173 data: data,
174 local: local,
175 actor: data["actor"],
176 recipients: recipients
177 }
178
179 with {:ok, activity} <- Repo.insert(struct) do
180 maybe_create_activity_expiration(activity)
181 end
182 end
183
184 def notify_and_stream(activity) do
185 Notification.create_notifications(activity)
186
187 conversation = create_or_bump_conversation(activity, activity.actor)
188 participations = get_participations(conversation)
189 stream_out(activity)
190 stream_out_participations(participations)
191 end
192
193 defp maybe_create_activity_expiration(
194 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
195 ) do
196 with {:ok, _job} <-
197 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
198 activity_id: activity.id,
199 expires_at: expires_at
200 }) do
201 {:ok, activity}
202 end
203 end
204
205 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
206
207 defp create_or_bump_conversation(activity, actor) do
208 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
209 %User{} = user <- User.get_cached_by_ap_id(actor) do
210 Participation.mark_as_read(user, conversation)
211 {:ok, conversation}
212 end
213 end
214
215 defp get_participations({:ok, conversation}) do
216 conversation
217 |> Repo.preload(:participations, force: true)
218 |> Map.get(:participations)
219 end
220
221 defp get_participations(_), do: []
222
223 def stream_out_participations(participations) do
224 participations =
225 participations
226 |> Repo.preload(:user)
227
228 Streamer.stream("participation", participations)
229 end
230
231 @impl true
232 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
233 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
234 conversation = Repo.preload(conversation, :participations)
235
236 last_activity_id =
237 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
238 user: user,
239 blocking_user: user
240 })
241
242 if last_activity_id do
243 stream_out_participations(conversation.participations)
244 end
245 end
246 end
247
248 @impl true
249 def stream_out_participations(_, _), do: :noop
250
251 @impl true
252 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
253 when data_type in ["Create", "Announce", "Delete"] do
254 activity
255 |> Topics.get_activity_topics()
256 |> Streamer.stream(activity)
257 end
258
259 @impl true
260 def stream_out(_activity) do
261 :noop
262 end
263
264 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
265 def create(params, fake \\ false) do
266 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
267 result
268 end
269 end
270
271 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
272 additional = params[:additional] || %{}
273 # only accept false as false value
274 local = !(params[:local] == false)
275 published = params[:published]
276 quick_insert? = Config.get([:env]) == :benchmark
277
278 create_data =
279 make_create_data(
280 %{to: to, actor: actor, published: published, context: context, object: object},
281 additional
282 )
283
284 with {:ok, activity} <- insert(create_data, local, fake),
285 {:fake, false, activity} <- {:fake, fake, activity},
286 _ <- increase_replies_count_if_reply(create_data),
287 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
288 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
289 _ <- notify_and_stream(activity),
290 :ok <- maybe_federate(activity) do
291 {:ok, activity}
292 else
293 {:quick_insert, true, activity} ->
294 {:ok, activity}
295
296 {:fake, true, activity} ->
297 {:ok, activity}
298
299 {:error, message} ->
300 Repo.rollback(message)
301 end
302 end
303
304 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
306 additional = params[:additional] || %{}
307 # only accept false as false value
308 local = !(params[:local] == false)
309 published = params[:published]
310
311 listen_data =
312 make_listen_data(
313 %{to: to, actor: actor, published: published, context: context, object: object},
314 additional
315 )
316
317 with {:ok, activity} <- insert(listen_data, local),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_federate(activity) do
320 {:ok, activity}
321 end
322 end
323
324 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
325 {:ok, Activity.t()} | nil | {:error, any()}
326 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
327 with {:ok, result} <-
328 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
329 result
330 end
331 end
332
333 defp do_unfollow(follower, followed, activity_id, local) do
334 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
335 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
336 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
337 {:ok, activity} <- insert(unfollow_data, local),
338 _ <- notify_and_stream(activity),
339 :ok <- maybe_federate(activity) do
340 {:ok, activity}
341 else
342 nil -> nil
343 {:error, error} -> Repo.rollback(error)
344 end
345 end
346
347 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
348 def flag(params) do
349 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
350 result
351 end
352 end
353
354 defp do_flag(
355 %{
356 actor: actor,
357 context: _context,
358 account: account,
359 statuses: statuses,
360 content: content
361 } = params
362 ) do
363 # only accept false as false value
364 local = !(params[:local] == false)
365 forward = !(params[:forward] == false)
366
367 additional = params[:additional] || %{}
368
369 additional =
370 if forward do
371 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
372 else
373 Map.merge(additional, %{"to" => [], "cc" => []})
374 end
375
376 with flag_data <- make_flag_data(params, additional),
377 {:ok, activity} <- insert(flag_data, local),
378 {:ok, stripped_activity} <- strip_report_status_data(activity),
379 _ <- notify_and_stream(activity),
380 :ok <-
381 maybe_federate(stripped_activity) do
382 User.all_superusers()
383 |> Enum.filter(fn user -> user.ap_id != actor end)
384 |> Enum.filter(fn user -> not is_nil(user.email) end)
385 |> Enum.each(fn superuser ->
386 superuser
387 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
388 |> Pleroma.Emails.Mailer.deliver_async()
389 end)
390
391 {:ok, activity}
392 else
393 {:error, error} -> Repo.rollback(error)
394 end
395 end
396
397 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
398 def move(%User{} = origin, %User{} = target, local \\ true) do
399 params = %{
400 "type" => "Move",
401 "actor" => origin.ap_id,
402 "object" => origin.ap_id,
403 "target" => target.ap_id
404 }
405
406 with true <- origin.ap_id in target.also_known_as,
407 {:ok, activity} <- insert(params, local),
408 _ <- notify_and_stream(activity) do
409 maybe_federate(activity)
410
411 BackgroundWorker.enqueue("move_following", %{
412 "origin_id" => origin.id,
413 "target_id" => target.id
414 })
415
416 {:ok, activity}
417 else
418 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
419 err -> err
420 end
421 end
422
423 def fetch_activities_for_context_query(context, opts) do
424 public = [Constants.as_public()]
425
426 recipients =
427 if opts[:user],
428 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
429 else: public
430
431 from(activity in Activity)
432 |> maybe_preload_objects(opts)
433 |> maybe_preload_bookmarks(opts)
434 |> maybe_set_thread_muted_field(opts)
435 |> restrict_blocked(opts)
436 |> restrict_recipients(recipients, opts[:user])
437 |> restrict_filtered(opts)
438 |> where(
439 [activity],
440 fragment(
441 "?->>'type' = ? and ?->>'context' = ?",
442 activity.data,
443 "Create",
444 activity.data,
445 ^context
446 )
447 )
448 |> exclude_poll_votes(opts)
449 |> exclude_id(opts)
450 |> order_by([activity], desc: activity.id)
451 end
452
453 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
454 def fetch_activities_for_context(context, opts \\ %{}) do
455 context
456 |> fetch_activities_for_context_query(opts)
457 |> Repo.all()
458 end
459
460 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
461 FlakeId.Ecto.CompatType.t() | nil
462 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
463 context
464 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
465 |> restrict_visibility(%{visibility: "direct"})
466 |> limit(1)
467 |> select([a], a.id)
468 |> Repo.one()
469 end
470
471 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
472 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
473 opts = Map.delete(opts, :user)
474
475 [Constants.as_public()]
476 |> fetch_activities_query(opts)
477 |> restrict_unlisted(opts)
478 |> Pagination.fetch_paginated(opts, pagination)
479 end
480
481 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
482 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
483 opts
484 |> Map.put(:restrict_unlisted, true)
485 |> fetch_public_or_unlisted_activities(pagination)
486 end
487
488 @valid_visibilities ~w[direct unlisted public private]
489
490 defp restrict_visibility(query, %{visibility: visibility})
491 when is_list(visibility) do
492 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
493 from(
494 a in query,
495 where:
496 fragment(
497 "activity_visibility(?, ?, ?) = ANY (?)",
498 a.actor,
499 a.recipients,
500 a.data,
501 ^visibility
502 )
503 )
504 else
505 Logger.error("Could not restrict visibility to #{visibility}")
506 end
507 end
508
509 defp restrict_visibility(query, %{visibility: visibility})
510 when visibility in @valid_visibilities do
511 from(
512 a in query,
513 where:
514 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
515 )
516 end
517
518 defp restrict_visibility(_query, %{visibility: visibility})
519 when visibility not in @valid_visibilities do
520 Logger.error("Could not restrict visibility to #{visibility}")
521 end
522
523 defp restrict_visibility(query, _visibility), do: query
524
525 defp exclude_visibility(query, %{exclude_visibilities: visibility})
526 when is_list(visibility) do
527 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
528 from(
529 a in query,
530 where:
531 not fragment(
532 "activity_visibility(?, ?, ?) = ANY (?)",
533 a.actor,
534 a.recipients,
535 a.data,
536 ^visibility
537 )
538 )
539 else
540 Logger.error("Could not exclude visibility to #{visibility}")
541 query
542 end
543 end
544
545 defp exclude_visibility(query, %{exclude_visibilities: visibility})
546 when visibility in @valid_visibilities do
547 from(
548 a in query,
549 where:
550 not fragment(
551 "activity_visibility(?, ?, ?) = ?",
552 a.actor,
553 a.recipients,
554 a.data,
555 ^visibility
556 )
557 )
558 end
559
560 defp exclude_visibility(query, %{exclude_visibilities: visibility})
561 when visibility not in [nil | @valid_visibilities] do
562 Logger.error("Could not exclude visibility to #{visibility}")
563 query
564 end
565
566 defp exclude_visibility(query, _visibility), do: query
567
568 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
569 do: query
570
571 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
572 do: query
573
574 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
575 from(
576 a in query,
577 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
578 )
579 end
580
581 defp restrict_thread_visibility(query, _, _), do: query
582
583 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
584 params =
585 params
586 |> Map.put(:user, reading_user)
587 |> Map.put(:actor_id, user.ap_id)
588
589 %{
590 godmode: params[:godmode],
591 reading_user: reading_user
592 }
593 |> user_activities_recipients()
594 |> fetch_activities(params)
595 |> Enum.reverse()
596 end
597
598 def fetch_user_activities(user, reading_user, params \\ %{})
599
600 def fetch_user_activities(user, reading_user, %{total: true} = params) do
601 result = fetch_activities_for_user(user, reading_user, params)
602
603 Keyword.put(result, :items, Enum.reverse(result[:items]))
604 end
605
606 def fetch_user_activities(user, reading_user, params) do
607 user
608 |> fetch_activities_for_user(reading_user, params)
609 |> Enum.reverse()
610 end
611
612 defp fetch_activities_for_user(user, reading_user, params) do
613 params =
614 params
615 |> Map.put(:type, ["Create", "Announce"])
616 |> Map.put(:user, reading_user)
617 |> Map.put(:actor_id, user.ap_id)
618 |> Map.put(:pinned_activity_ids, user.pinned_activities)
619
620 params =
621 if User.blocks?(reading_user, user) do
622 params
623 else
624 params
625 |> Map.put(:blocking_user, reading_user)
626 |> Map.put(:muting_user, reading_user)
627 end
628
629 pagination_type = Map.get(params, :pagination_type) || :keyset
630
631 %{
632 godmode: params[:godmode],
633 reading_user: reading_user
634 }
635 |> user_activities_recipients()
636 |> fetch_activities(params, pagination_type)
637 end
638
639 def fetch_statuses(reading_user, %{total: true} = params) do
640 result = fetch_activities_for_reading_user(reading_user, params)
641 Keyword.put(result, :items, Enum.reverse(result[:items]))
642 end
643
644 def fetch_statuses(reading_user, params) do
645 reading_user
646 |> fetch_activities_for_reading_user(params)
647 |> Enum.reverse()
648 end
649
650 defp fetch_activities_for_reading_user(reading_user, params) do
651 params = Map.put(params, :type, ["Create", "Announce"])
652
653 %{
654 godmode: params[:godmode],
655 reading_user: reading_user
656 }
657 |> user_activities_recipients()
658 |> fetch_activities(params, :offset)
659 end
660
661 defp user_activities_recipients(%{godmode: true}), do: []
662
663 defp user_activities_recipients(%{reading_user: reading_user}) do
664 if reading_user do
665 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
666 else
667 [Constants.as_public()]
668 end
669 end
670
671 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
672 raise "Can't use the child object without preloading!"
673 end
674
675 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
676 from(
677 [activity, object] in query,
678 where:
679 fragment(
680 "?->>'type' != ? or ?->>'actor' != ?",
681 activity.data,
682 "Announce",
683 object.data,
684 ^actor
685 )
686 )
687 end
688
689 defp restrict_announce_object_actor(query, _), do: query
690
691 defp restrict_since(query, %{since_id: ""}), do: query
692
693 defp restrict_since(query, %{since_id: since_id}) do
694 from(activity in query, where: activity.id > ^since_id)
695 end
696
697 defp restrict_since(query, _), do: query
698
699 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
700 raise "Can't use the child object without preloading!"
701 end
702
703 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
704 from(
705 [_activity, object] in query,
706 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
707 )
708 end
709
710 defp restrict_tag_reject(query, _), do: query
711
712 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
713 raise "Can't use the child object without preloading!"
714 end
715
716 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
717 from(
718 [_activity, object] in query,
719 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
720 )
721 end
722
723 defp restrict_tag_all(query, _), do: query
724
725 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
726 raise "Can't use the child object without preloading!"
727 end
728
729 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
730 from(
731 [_activity, object] in query,
732 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
733 )
734 end
735
736 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
737 from(
738 [_activity, object] in query,
739 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
740 )
741 end
742
743 defp restrict_tag(query, _), do: query
744
745 defp restrict_recipients(query, [], _user), do: query
746
747 defp restrict_recipients(query, recipients, nil) do
748 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
749 end
750
751 defp restrict_recipients(query, recipients, user) do
752 from(
753 activity in query,
754 where: fragment("? && ?", ^recipients, activity.recipients),
755 or_where: activity.actor == ^user.ap_id
756 )
757 end
758
759 defp restrict_local(query, %{local_only: true}) do
760 from(activity in query, where: activity.local == true)
761 end
762
763 defp restrict_local(query, _), do: query
764
765 defp restrict_remote(query, %{remote: true}) do
766 from(activity in query, where: activity.local == false)
767 end
768
769 defp restrict_remote(query, _), do: query
770
771 defp restrict_actor(query, %{actor_id: actor_id}) do
772 from(activity in query, where: activity.actor == ^actor_id)
773 end
774
775 defp restrict_actor(query, _), do: query
776
777 defp restrict_type(query, %{type: type}) when is_binary(type) do
778 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
779 end
780
781 defp restrict_type(query, %{type: type}) do
782 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
783 end
784
785 defp restrict_type(query, _), do: query
786
787 defp restrict_state(query, %{state: state}) do
788 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
789 end
790
791 defp restrict_state(query, _), do: query
792
793 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
794 from(
795 [_activity, object] in query,
796 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
797 )
798 end
799
800 defp restrict_favorited_by(query, _), do: query
801
802 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
803 raise "Can't use the child object without preloading!"
804 end
805
806 defp restrict_media(query, %{only_media: true}) do
807 from(
808 [activity, object] in query,
809 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
810 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
811 )
812 end
813
814 defp restrict_media(query, _), do: query
815
816 defp restrict_replies(query, %{exclude_replies: true}) do
817 from(
818 [_activity, object] in query,
819 where: fragment("?->>'inReplyTo' is null", object.data)
820 )
821 end
822
823 defp restrict_replies(query, %{
824 reply_filtering_user: %User{} = user,
825 reply_visibility: "self"
826 }) do
827 from(
828 [activity, object] in query,
829 where:
830 fragment(
831 "?->>'inReplyTo' is null OR ? = ANY(?)",
832 object.data,
833 ^user.ap_id,
834 activity.recipients
835 )
836 )
837 end
838
839 defp restrict_replies(query, %{
840 reply_filtering_user: %User{} = user,
841 reply_visibility: "following"
842 }) do
843 from(
844 [activity, object] in query,
845 where:
846 fragment(
847 """
848 ?->>'type' != 'Create' -- This isn't a Create
849 OR ?->>'inReplyTo' is null -- this isn't a reply
850 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
851 -- unless they are the author (because authors
852 -- are also part of the recipients). This leads
853 -- to a bug that self-replies by friends won't
854 -- show up.
855 OR ? = ? -- The actor is us
856 """,
857 activity.data,
858 object.data,
859 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
860 activity.recipients,
861 activity.actor,
862 activity.actor,
863 ^user.ap_id
864 )
865 )
866 end
867
868 defp restrict_replies(query, _), do: query
869
870 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
871 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
872 end
873
874 defp restrict_reblogs(query, _), do: query
875
876 defp restrict_muted(query, %{with_muted: true}), do: query
877
878 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
879 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
880
881 query =
882 from([activity] in query,
883 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
884 where:
885 fragment(
886 "not (?->'to' \\?| ?) or ? = ?",
887 activity.data,
888 ^mutes,
889 activity.actor,
890 ^user.ap_id
891 )
892 )
893
894 unless opts[:skip_preload] do
895 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
896 else
897 query
898 end
899 end
900
901 defp restrict_muted(query, _), do: query
902
903 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
904 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
905 domain_blocks = user.domain_blocks || []
906
907 following_ap_ids = User.get_friends_ap_ids(user)
908
909 query =
910 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
911
912 from(
913 [activity, object: o] in query,
914 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
915 where:
916 fragment(
917 "((not (? && ?)) or ? = ?)",
918 activity.recipients,
919 ^blocked_ap_ids,
920 activity.actor,
921 ^user.ap_id
922 ),
923 where:
924 fragment(
925 "recipients_contain_blocked_domains(?, ?) = false",
926 activity.recipients,
927 ^domain_blocks
928 ),
929 where:
930 fragment(
931 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
932 activity.data,
933 activity.data,
934 ^blocked_ap_ids
935 ),
936 where:
937 fragment(
938 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
939 activity.actor,
940 ^domain_blocks,
941 activity.actor,
942 ^following_ap_ids
943 ),
944 where:
945 fragment(
946 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
947 o.data,
948 ^domain_blocks,
949 o.data,
950 ^following_ap_ids
951 )
952 )
953 end
954
955 defp restrict_blocked(query, _), do: query
956
957 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
958 from(
959 activity in query,
960 where:
961 fragment(
962 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
963 activity.data,
964 ^[Constants.as_public()]
965 )
966 )
967 end
968
969 defp restrict_unlisted(query, _), do: query
970
971 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
972 from(activity in query, where: activity.id in ^ids)
973 end
974
975 defp restrict_pinned(query, _), do: query
976
977 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
978 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
979
980 from(
981 activity in query,
982 where:
983 fragment(
984 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
985 activity.data,
986 activity.actor,
987 ^muted_reblogs
988 )
989 )
990 end
991
992 defp restrict_muted_reblogs(query, _), do: query
993
994 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
995 from(
996 activity in query,
997 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
998 )
999 end
1000
1001 defp restrict_instance(query, _), do: query
1002
1003 defp restrict_filtered(query, %{user: %User{} = user}) do
1004 case Filter.compose_regex(user) do
1005 nil ->
1006 query
1007
1008 regex ->
1009 from([activity, object] in query,
1010 where:
1011 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1012 activity.actor == ^user.ap_id
1013 )
1014 end
1015 end
1016
1017 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1018 restrict_filtered(query, %{user: user})
1019 end
1020
1021 defp restrict_filtered(query, _), do: query
1022
1023 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1024
1025 defp exclude_poll_votes(query, _) do
1026 if has_named_binding?(query, :object) do
1027 from([activity, object: o] in query,
1028 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1029 )
1030 else
1031 query
1032 end
1033 end
1034
1035 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1036
1037 defp exclude_chat_messages(query, _) do
1038 if has_named_binding?(query, :object) do
1039 from([activity, object: o] in query,
1040 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1041 )
1042 else
1043 query
1044 end
1045 end
1046
1047 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1048
1049 defp exclude_invisible_actors(query, _opts) do
1050 invisible_ap_ids =
1051 User.Query.build(%{invisible: true, select: [:ap_id]})
1052 |> Repo.all()
1053 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1054
1055 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1056 end
1057
1058 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1059 from(activity in query, where: activity.id != ^id)
1060 end
1061
1062 defp exclude_id(query, _), do: query
1063
1064 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1065
1066 defp maybe_preload_objects(query, _) do
1067 query
1068 |> Activity.with_preloaded_object()
1069 end
1070
1071 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1072
1073 defp maybe_preload_bookmarks(query, opts) do
1074 query
1075 |> Activity.with_preloaded_bookmark(opts[:user])
1076 end
1077
1078 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1079 query
1080 |> Activity.with_preloaded_report_notes()
1081 end
1082
1083 defp maybe_preload_report_notes(query, _), do: query
1084
1085 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1086
1087 defp maybe_set_thread_muted_field(query, opts) do
1088 query
1089 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1090 end
1091
1092 defp maybe_order(query, %{order: :desc}) do
1093 query
1094 |> order_by(desc: :id)
1095 end
1096
1097 defp maybe_order(query, %{order: :asc}) do
1098 query
1099 |> order_by(asc: :id)
1100 end
1101
1102 defp maybe_order(query, _), do: query
1103
1104 defp fetch_activities_query_ap_ids_ops(opts) do
1105 source_user = opts[:muting_user]
1106 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1107
1108 ap_id_relationships =
1109 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1110 [:block | ap_id_relationships]
1111 else
1112 ap_id_relationships
1113 end
1114
1115 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1116
1117 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1118 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1119
1120 restrict_muted_reblogs_opts =
1121 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1122
1123 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1124 end
1125
1126 def fetch_activities_query(recipients, opts \\ %{}) do
1127 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1128 fetch_activities_query_ap_ids_ops(opts)
1129
1130 config = %{
1131 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1132 }
1133
1134 Activity
1135 |> maybe_preload_objects(opts)
1136 |> maybe_preload_bookmarks(opts)
1137 |> maybe_preload_report_notes(opts)
1138 |> maybe_set_thread_muted_field(opts)
1139 |> maybe_order(opts)
1140 |> restrict_recipients(recipients, opts[:user])
1141 |> restrict_replies(opts)
1142 |> restrict_tag(opts)
1143 |> restrict_tag_reject(opts)
1144 |> restrict_tag_all(opts)
1145 |> restrict_since(opts)
1146 |> restrict_local(opts)
1147 |> restrict_remote(opts)
1148 |> restrict_actor(opts)
1149 |> restrict_type(opts)
1150 |> restrict_state(opts)
1151 |> restrict_favorited_by(opts)
1152 |> restrict_blocked(restrict_blocked_opts)
1153 |> restrict_muted(restrict_muted_opts)
1154 |> restrict_filtered(opts)
1155 |> restrict_media(opts)
1156 |> restrict_visibility(opts)
1157 |> restrict_thread_visibility(opts, config)
1158 |> restrict_reblogs(opts)
1159 |> restrict_pinned(opts)
1160 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1161 |> restrict_instance(opts)
1162 |> restrict_announce_object_actor(opts)
1163 |> restrict_filtered(opts)
1164 |> Activity.restrict_deactivated_users()
1165 |> exclude_poll_votes(opts)
1166 |> exclude_chat_messages(opts)
1167 |> exclude_invisible_actors(opts)
1168 |> exclude_visibility(opts)
1169 end
1170
1171 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1172 list_memberships = Pleroma.List.memberships(opts[:user])
1173
1174 fetch_activities_query(recipients ++ list_memberships, opts)
1175 |> Pagination.fetch_paginated(opts, pagination)
1176 |> Enum.reverse()
1177 |> maybe_update_cc(list_memberships, opts[:user])
1178 end
1179
1180 @doc """
1181 Fetch favorites activities of user with order by sort adds to favorites
1182 """
1183 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1184 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1185 user.ap_id
1186 |> Activity.Queries.by_actor()
1187 |> Activity.Queries.by_type("Like")
1188 |> Activity.with_joined_object()
1189 |> Object.with_joined_activity()
1190 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1191 |> order_by([like, _, _], desc_nulls_last: like.id)
1192 |> Pagination.fetch_paginated(
1193 Map.merge(params, %{skip_order: true}),
1194 pagination
1195 )
1196 end
1197
1198 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1199 Enum.map(activities, fn
1200 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1201 if Enum.any?(bcc, &(&1 in list_memberships)) do
1202 update_in(activity.data["cc"], &[user_ap_id | &1])
1203 else
1204 activity
1205 end
1206
1207 activity ->
1208 activity
1209 end)
1210 end
1211
1212 defp maybe_update_cc(activities, _, _), do: activities
1213
1214 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1215 from(activity in query,
1216 where:
1217 fragment("? && ?", activity.recipients, ^recipients) or
1218 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1219 ^Constants.as_public() in activity.recipients)
1220 )
1221 end
1222
1223 def fetch_activities_bounded(
1224 recipients,
1225 recipients_with_public,
1226 opts \\ %{},
1227 pagination \\ :keyset
1228 ) do
1229 fetch_activities_query([], opts)
1230 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1231 |> Pagination.fetch_paginated(opts, pagination)
1232 |> Enum.reverse()
1233 end
1234
1235 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1236 def upload(file, opts \\ []) do
1237 with {:ok, data} <- Upload.store(file, opts) do
1238 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1239
1240 Repo.insert(%Object{data: obj_data})
1241 end
1242 end
1243
1244 @spec get_actor_url(any()) :: binary() | nil
1245 defp get_actor_url(url) when is_binary(url), do: url
1246 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1247
1248 defp get_actor_url(url) when is_list(url) do
1249 url
1250 |> List.first()
1251 |> get_actor_url()
1252 end
1253
1254 defp get_actor_url(_url), do: nil
1255
1256 defp object_to_user_data(data) do
1257 avatar =
1258 data["icon"]["url"] &&
1259 %{
1260 "type" => "Image",
1261 "url" => [%{"href" => data["icon"]["url"]}]
1262 }
1263
1264 banner =
1265 data["image"]["url"] &&
1266 %{
1267 "type" => "Image",
1268 "url" => [%{"href" => data["image"]["url"]}]
1269 }
1270
1271 fields =
1272 data
1273 |> Map.get("attachment", [])
1274 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1275 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1276
1277 emojis =
1278 data
1279 |> Map.get("tag", [])
1280 |> Enum.filter(fn
1281 %{"type" => "Emoji"} -> true
1282 _ -> false
1283 end)
1284 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1285 {String.trim(name, ":"), url}
1286 end)
1287
1288 is_locked = data["manuallyApprovesFollowers"] || false
1289 capabilities = data["capabilities"] || %{}
1290 accepts_chat_messages = capabilities["acceptsChatMessages"]
1291 data = Transmogrifier.maybe_fix_user_object(data)
1292 is_discoverable = data["discoverable"] || false
1293 invisible = data["invisible"] || false
1294 actor_type = data["type"] || "Person"
1295
1296 public_key =
1297 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1298 data["publicKey"]["publicKeyPem"]
1299 else
1300 nil
1301 end
1302
1303 shared_inbox =
1304 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1305 data["endpoints"]["sharedInbox"]
1306 else
1307 nil
1308 end
1309
1310 user_data = %{
1311 ap_id: data["id"],
1312 uri: get_actor_url(data["url"]),
1313 ap_enabled: true,
1314 banner: banner,
1315 fields: fields,
1316 emoji: emojis,
1317 is_locked: is_locked,
1318 is_discoverable: is_discoverable,
1319 invisible: invisible,
1320 avatar: avatar,
1321 name: data["name"],
1322 follower_address: data["followers"],
1323 following_address: data["following"],
1324 bio: data["summary"] || "",
1325 actor_type: actor_type,
1326 also_known_as: Map.get(data, "alsoKnownAs", []),
1327 public_key: public_key,
1328 inbox: data["inbox"],
1329 shared_inbox: shared_inbox,
1330 accepts_chat_messages: accepts_chat_messages
1331 }
1332
1333 # nickname can be nil because of virtual actors
1334 if data["preferredUsername"] do
1335 Map.put(
1336 user_data,
1337 :nickname,
1338 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1339 )
1340 else
1341 Map.put(user_data, :nickname, nil)
1342 end
1343 end
1344
1345 def fetch_follow_information_for_user(user) do
1346 with {:ok, following_data} <-
1347 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1348 {:ok, hide_follows} <- collection_private(following_data),
1349 {:ok, followers_data} <-
1350 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1351 {:ok, hide_followers} <- collection_private(followers_data) do
1352 {:ok,
1353 %{
1354 hide_follows: hide_follows,
1355 follower_count: normalize_counter(followers_data["totalItems"]),
1356 following_count: normalize_counter(following_data["totalItems"]),
1357 hide_followers: hide_followers
1358 }}
1359 else
1360 {:error, _} = e -> e
1361 e -> {:error, e}
1362 end
1363 end
1364
1365 defp normalize_counter(counter) when is_integer(counter), do: counter
1366 defp normalize_counter(_), do: 0
1367
1368 def maybe_update_follow_information(user_data) do
1369 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1370 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1371 {_, true} <-
1372 {:collections_available,
1373 !!(user_data[:following_address] && user_data[:follower_address])},
1374 {:ok, info} <-
1375 fetch_follow_information_for_user(user_data) do
1376 info = Map.merge(user_data[:info] || %{}, info)
1377
1378 user_data
1379 |> Map.put(:info, info)
1380 else
1381 {:user_type_check, false} ->
1382 user_data
1383
1384 {:collections_available, false} ->
1385 user_data
1386
1387 {:enabled, false} ->
1388 user_data
1389
1390 e ->
1391 Logger.error(
1392 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1393 )
1394
1395 user_data
1396 end
1397 end
1398
1399 defp collection_private(%{"first" => %{"type" => type}})
1400 when type in ["CollectionPage", "OrderedCollectionPage"],
1401 do: {:ok, false}
1402
1403 defp collection_private(%{"first" => first}) do
1404 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1405 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1406 {:ok, false}
1407 else
1408 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1409 {:error, _} = e -> e
1410 e -> {:error, e}
1411 end
1412 end
1413
1414 defp collection_private(_data), do: {:ok, true}
1415
1416 def user_data_from_user_object(data) do
1417 with {:ok, data} <- MRF.filter(data) do
1418 {:ok, object_to_user_data(data)}
1419 else
1420 e -> {:error, e}
1421 end
1422 end
1423
1424 def fetch_and_prepare_user_from_ap_id(ap_id) do
1425 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1426 {:ok, data} <- user_data_from_user_object(data) do
1427 {:ok, maybe_update_follow_information(data)}
1428 else
1429 # If this has been deleted, only log a debug and not an error
1430 {:error, "Object has been deleted" = e} ->
1431 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1432 {:error, e}
1433
1434 {:error, {:reject, reason} = e} ->
1435 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1436 {:error, e}
1437
1438 {:error, e} ->
1439 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1440 {:error, e}
1441 end
1442 end
1443
1444 def maybe_handle_clashing_nickname(data) do
1445 with nickname when is_binary(nickname) <- data[:nickname],
1446 %User{} = old_user <- User.get_by_nickname(nickname),
1447 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1448 Logger.info(
1449 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1450 data[:ap_id]
1451 }, renaming."
1452 )
1453
1454 old_user
1455 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1456 |> User.update_and_set_cache()
1457 else
1458 {:ap_id_comparison, true} ->
1459 Logger.info(
1460 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1461 )
1462
1463 _ ->
1464 nil
1465 end
1466 end
1467
1468 def make_user_from_ap_id(ap_id) do
1469 user = User.get_cached_by_ap_id(ap_id)
1470
1471 if user && !User.ap_enabled?(user) do
1472 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1473 else
1474 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1475 if user do
1476 user
1477 |> User.remote_user_changeset(data)
1478 |> User.update_and_set_cache()
1479 else
1480 maybe_handle_clashing_nickname(data)
1481
1482 data
1483 |> User.remote_user_changeset()
1484 |> Repo.insert()
1485 |> User.set_cache()
1486 end
1487 end
1488 end
1489 end
1490
1491 def make_user_from_nickname(nickname) do
1492 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1493 make_user_from_ap_id(ap_id)
1494 else
1495 _e -> {:error, "No AP id in WebFinger"}
1496 end
1497 end
1498
1499 # filter out broken threads
1500 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1501 entire_thread_visible_for_user?(activity, user)
1502 end
1503
1504 # do post-processing on a specific activity
1505 def contain_activity(%Activity{} = activity, %User{} = user) do
1506 contain_broken_threads(activity, user)
1507 end
1508
1509 def fetch_direct_messages_query do
1510 Activity
1511 |> restrict_type(%{type: "Create"})
1512 |> restrict_visibility(%{visibility: "direct"})
1513 |> order_by([activity], asc: activity.id)
1514 end
1515 end