Pipeline Ingestion: Note
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
53 {recipients, to, cc}
54 end
55
56 defp check_actor_is_active(nil), do: true
57
58 defp check_actor_is_active(actor) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
61 _ -> false
62 end
63 end
64
65 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
66 limit = Config.get([:instance, :remote_limit])
67 String.length(content) <= limit
68 end
69
70 defp check_remote_limit(_), do: true
71
72 def increase_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 end
75
76 def decrease_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 end
79
80 defp increase_replies_count_if_reply(%{
81 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 "type" => "Create"
83 }) do
84 if is_public?(object) do
85 Object.increase_replies_count(reply_ap_id)
86 end
87 end
88
89 defp increase_replies_count_if_reply(_create_data), do: :noop
90
91 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note]
92 @impl true
93 def persist(%{"type" => type} = object, meta) when type in @object_types do
94 with {:ok, object} <- Object.create(object) do
95 {:ok, object, meta}
96 end
97 end
98
99 @impl true
100 def persist(object, meta) do
101 with local <- Keyword.fetch!(meta, :local),
102 {recipients, _, _} <- get_recipients(object),
103 {:ok, activity} <-
104 Repo.insert(%Activity{
105 data: object,
106 local: local,
107 recipients: recipients,
108 actor: object["actor"]
109 }),
110 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
111 {:ok, _} <- maybe_create_activity_expiration(activity) do
112 {:ok, activity, meta}
113 end
114 end
115
116 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
117 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
118 with nil <- Activity.normalize(map),
119 map <- lazy_put_activity_defaults(map, fake),
120 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
121 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
122 {:ok, map} <- MRF.filter(map),
123 {recipients, _, _} = get_recipients(map),
124 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
125 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
126 {:ok, map, object} <- insert_full_object(map),
127 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
128 # Splice in the child object if we have one.
129 activity = Maps.put_if_present(activity, :object, object)
130
131 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
132 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
133 end)
134
135 {:ok, activity}
136 else
137 %Activity{} = activity ->
138 {:ok, activity}
139
140 {:actor_check, _} ->
141 {:error, false}
142
143 {:containment, _} = error ->
144 error
145
146 {:error, _} = error ->
147 error
148
149 {:fake, true, map, recipients} ->
150 activity = %Activity{
151 data: map,
152 local: local,
153 actor: map["actor"],
154 recipients: recipients,
155 id: "pleroma:fakeid"
156 }
157
158 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:ok, activity}
160
161 {:remote_limit_pass, _} ->
162 {:error, :remote_limit}
163
164 {:reject, _} = e ->
165 {:error, e}
166 end
167 end
168
169 defp insert_activity_with_expiration(data, local, recipients) do
170 struct = %Activity{
171 data: data,
172 local: local,
173 actor: data["actor"],
174 recipients: recipients
175 }
176
177 with {:ok, activity} <- Repo.insert(struct) do
178 maybe_create_activity_expiration(activity)
179 end
180 end
181
182 def notify_and_stream(activity) do
183 Notification.create_notifications(activity)
184
185 conversation = create_or_bump_conversation(activity, activity.actor)
186 participations = get_participations(conversation)
187 stream_out(activity)
188 stream_out_participations(participations)
189 end
190
191 defp maybe_create_activity_expiration(
192 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 ) do
194 with {:ok, _job} <-
195 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
196 activity_id: activity.id,
197 expires_at: expires_at
198 }) do
199 {:ok, activity}
200 end
201 end
202
203 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204
205 defp create_or_bump_conversation(activity, actor) do
206 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
207 %User{} = user <- User.get_cached_by_ap_id(actor) do
208 Participation.mark_as_read(user, conversation)
209 {:ok, conversation}
210 end
211 end
212
213 defp get_participations({:ok, conversation}) do
214 conversation
215 |> Repo.preload(:participations, force: true)
216 |> Map.get(:participations)
217 end
218
219 defp get_participations(_), do: []
220
221 def stream_out_participations(participations) do
222 participations =
223 participations
224 |> Repo.preload(:user)
225
226 Streamer.stream("participation", participations)
227 end
228
229 @impl true
230 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
231 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
232 conversation = Repo.preload(conversation, :participations)
233
234 last_activity_id =
235 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
236 user: user,
237 blocking_user: user
238 })
239
240 if last_activity_id do
241 stream_out_participations(conversation.participations)
242 end
243 end
244 end
245
246 @impl true
247 def stream_out_participations(_, _), do: :noop
248
249 @impl true
250 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
251 when data_type in ["Create", "Announce", "Delete"] do
252 activity
253 |> Topics.get_activity_topics()
254 |> Streamer.stream(activity)
255 end
256
257 @impl true
258 def stream_out(_activity) do
259 :noop
260 end
261
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
265 result
266 end
267 end
268
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
275
276 create_data =
277 make_create_data(
278 %{to: to, actor: actor, published: published, context: context, object: object},
279 additional
280 )
281
282 with {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 _ <- notify_and_stream(activity),
288 :ok <- maybe_federate(activity) do
289 {:ok, activity}
290 else
291 {:quick_insert, true, activity} ->
292 {:ok, activity}
293
294 {:fake, true, activity} ->
295 {:ok, activity}
296
297 {:error, message} ->
298 Repo.rollback(message)
299 end
300 end
301
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
308
309 listen_data =
310 make_listen_data(
311 %{to: to, actor: actor, published: published, context: context, object: object},
312 additional
313 )
314
315 with {:ok, activity} <- insert(listen_data, local),
316 _ <- notify_and_stream(activity),
317 :ok <- maybe_federate(activity) do
318 {:ok, activity}
319 end
320 end
321
322 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
323 {:ok, Activity.t()} | nil | {:error, any()}
324 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
325 with {:ok, result} <-
326 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
327 result
328 end
329 end
330
331 defp do_unfollow(follower, followed, activity_id, local) do
332 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
333 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
334 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
335 {:ok, activity} <- insert(unfollow_data, local),
336 _ <- notify_and_stream(activity),
337 :ok <- maybe_federate(activity) do
338 {:ok, activity}
339 else
340 nil -> nil
341 {:error, error} -> Repo.rollback(error)
342 end
343 end
344
345 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 def flag(params) do
347 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
348 result
349 end
350 end
351
352 defp do_flag(
353 %{
354 actor: actor,
355 context: _context,
356 account: account,
357 statuses: statuses,
358 content: content
359 } = params
360 ) do
361 # only accept false as false value
362 local = !(params[:local] == false)
363 forward = !(params[:forward] == false)
364
365 additional = params[:additional] || %{}
366
367 additional =
368 if forward do
369 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 else
371 Map.merge(additional, %{"to" => [], "cc" => []})
372 end
373
374 with flag_data <- make_flag_data(params, additional),
375 {:ok, activity} <- insert(flag_data, local),
376 {:ok, stripped_activity} <- strip_report_status_data(activity),
377 _ <- notify_and_stream(activity),
378 :ok <-
379 maybe_federate(stripped_activity) do
380 User.all_superusers()
381 |> Enum.filter(fn user -> user.ap_id != actor end)
382 |> Enum.filter(fn user -> not is_nil(user.email) end)
383 |> Enum.each(fn superuser ->
384 superuser
385 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
386 |> Pleroma.Emails.Mailer.deliver_async()
387 end)
388
389 {:ok, activity}
390 else
391 {:error, error} -> Repo.rollback(error)
392 end
393 end
394
395 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
396 def move(%User{} = origin, %User{} = target, local \\ true) do
397 params = %{
398 "type" => "Move",
399 "actor" => origin.ap_id,
400 "object" => origin.ap_id,
401 "target" => target.ap_id
402 }
403
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
408
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
412 })
413
414 {:ok, activity}
415 else
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
417 err -> err
418 end
419 end
420
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
423
424 recipients =
425 if opts[:user],
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 else: public
428
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_recipients(recipients, opts[:user])
435 |> restrict_filtered(opts)
436 |> where(
437 [activity],
438 fragment(
439 "?->>'type' = ? and ?->>'context' = ?",
440 activity.data,
441 "Create",
442 activity.data,
443 ^context
444 )
445 )
446 |> exclude_poll_votes(opts)
447 |> exclude_id(opts)
448 |> order_by([activity], desc: activity.id)
449 end
450
451 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
452 def fetch_activities_for_context(context, opts \\ %{}) do
453 context
454 |> fetch_activities_for_context_query(opts)
455 |> Repo.all()
456 end
457
458 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
459 FlakeId.Ecto.CompatType.t() | nil
460 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
461 context
462 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
463 |> restrict_visibility(%{visibility: "direct"})
464 |> limit(1)
465 |> select([a], a.id)
466 |> Repo.one()
467 end
468
469 defp fetch_paginated_optimized(query, opts, pagination) do
470 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
471 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
472 opts = Map.put(opts, :skip_extra_order, true)
473
474 Pagination.fetch_paginated(query, opts, pagination)
475 end
476
477 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
478 list_memberships = Pleroma.List.memberships(opts[:user])
479
480 fetch_activities_query(recipients ++ list_memberships, opts)
481 |> fetch_paginated_optimized(opts, pagination)
482 |> Enum.reverse()
483 |> maybe_update_cc(list_memberships, opts[:user])
484 end
485
486 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
487 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
488 opts = Map.delete(opts, :user)
489
490 [Constants.as_public()]
491 |> fetch_activities_query(opts)
492 |> restrict_unlisted(opts)
493 |> fetch_paginated_optimized(opts, pagination)
494 end
495
496 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
497 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
498 opts
499 |> Map.put(:restrict_unlisted, true)
500 |> fetch_public_or_unlisted_activities(pagination)
501 end
502
503 @valid_visibilities ~w[direct unlisted public private]
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when is_list(visibility) do
507 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
508 from(
509 a in query,
510 where:
511 fragment(
512 "activity_visibility(?, ?, ?) = ANY (?)",
513 a.actor,
514 a.recipients,
515 a.data,
516 ^visibility
517 )
518 )
519 else
520 Logger.error("Could not restrict visibility to #{visibility}")
521 end
522 end
523
524 defp restrict_visibility(query, %{visibility: visibility})
525 when visibility in @valid_visibilities do
526 from(
527 a in query,
528 where:
529 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
530 )
531 end
532
533 defp restrict_visibility(_query, %{visibility: visibility})
534 when visibility not in @valid_visibilities do
535 Logger.error("Could not restrict visibility to #{visibility}")
536 end
537
538 defp restrict_visibility(query, _visibility), do: query
539
540 defp exclude_visibility(query, %{exclude_visibilities: visibility})
541 when is_list(visibility) do
542 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ANY (?)",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 else
555 Logger.error("Could not exclude visibility to #{visibility}")
556 query
557 end
558 end
559
560 defp exclude_visibility(query, %{exclude_visibilities: visibility})
561 when visibility in @valid_visibilities do
562 from(
563 a in query,
564 where:
565 not fragment(
566 "activity_visibility(?, ?, ?) = ?",
567 a.actor,
568 a.recipients,
569 a.data,
570 ^visibility
571 )
572 )
573 end
574
575 defp exclude_visibility(query, %{exclude_visibilities: visibility})
576 when visibility not in [nil | @valid_visibilities] do
577 Logger.error("Could not exclude visibility to #{visibility}")
578 query
579 end
580
581 defp exclude_visibility(query, _visibility), do: query
582
583 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
584 do: query
585
586 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
587 do: query
588
589 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
590 from(
591 a in query,
592 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
593 )
594 end
595
596 defp restrict_thread_visibility(query, _, _), do: query
597
598 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
599 params =
600 params
601 |> Map.put(:user, reading_user)
602 |> Map.put(:actor_id, user.ap_id)
603
604 %{
605 godmode: params[:godmode],
606 reading_user: reading_user
607 }
608 |> user_activities_recipients()
609 |> fetch_activities(params)
610 |> Enum.reverse()
611 end
612
613 def fetch_user_activities(user, reading_user, params \\ %{})
614
615 def fetch_user_activities(user, reading_user, %{total: true} = params) do
616 result = fetch_activities_for_user(user, reading_user, params)
617
618 Keyword.put(result, :items, Enum.reverse(result[:items]))
619 end
620
621 def fetch_user_activities(user, reading_user, params) do
622 user
623 |> fetch_activities_for_user(reading_user, params)
624 |> Enum.reverse()
625 end
626
627 defp fetch_activities_for_user(user, reading_user, params) do
628 params =
629 params
630 |> Map.put(:type, ["Create", "Announce"])
631 |> Map.put(:user, reading_user)
632 |> Map.put(:actor_id, user.ap_id)
633 |> Map.put(:pinned_activity_ids, user.pinned_activities)
634
635 params =
636 if User.blocks?(reading_user, user) do
637 params
638 else
639 params
640 |> Map.put(:blocking_user, reading_user)
641 |> Map.put(:muting_user, reading_user)
642 end
643
644 pagination_type = Map.get(params, :pagination_type) || :keyset
645
646 %{
647 godmode: params[:godmode],
648 reading_user: reading_user
649 }
650 |> user_activities_recipients()
651 |> fetch_activities(params, pagination_type)
652 end
653
654 def fetch_statuses(reading_user, %{total: true} = params) do
655 result = fetch_activities_for_reading_user(reading_user, params)
656 Keyword.put(result, :items, Enum.reverse(result[:items]))
657 end
658
659 def fetch_statuses(reading_user, params) do
660 reading_user
661 |> fetch_activities_for_reading_user(params)
662 |> Enum.reverse()
663 end
664
665 defp fetch_activities_for_reading_user(reading_user, params) do
666 params = Map.put(params, :type, ["Create", "Announce"])
667
668 %{
669 godmode: params[:godmode],
670 reading_user: reading_user
671 }
672 |> user_activities_recipients()
673 |> fetch_activities(params, :offset)
674 end
675
676 defp user_activities_recipients(%{godmode: true}), do: []
677
678 defp user_activities_recipients(%{reading_user: reading_user}) do
679 if reading_user do
680 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
681 else
682 [Constants.as_public()]
683 end
684 end
685
686 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
688 end
689
690 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
691 from(
692 [activity, object] in query,
693 where:
694 fragment(
695 "?->>'type' != ? or ?->>'actor' != ?",
696 activity.data,
697 "Announce",
698 object.data,
699 ^actor
700 )
701 )
702 end
703
704 defp restrict_announce_object_actor(query, _), do: query
705
706 defp restrict_since(query, %{since_id: ""}), do: query
707
708 defp restrict_since(query, %{since_id: since_id}) do
709 from(activity in query, where: activity.id > ^since_id)
710 end
711
712 defp restrict_since(query, _), do: query
713
714 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
715 raise_on_missing_preload()
716 end
717
718 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
719 from(
720 [_activity, object] in query,
721 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
722 )
723 end
724
725 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
726 restrict_embedded_tag_any(query, %{tag: tag})
727 end
728
729 defp restrict_embedded_tag_all(query, _), do: query
730
731 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
732 raise_on_missing_preload()
733 end
734
735 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
736 from(
737 [_activity, object] in query,
738 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
739 )
740 end
741
742 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
743 restrict_embedded_tag_any(query, %{tag: [tag]})
744 end
745
746 defp restrict_embedded_tag_any(query, _), do: query
747
748 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
749 raise_on_missing_preload()
750 end
751
752 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
753 from(
754 [_activity, object] in query,
755 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
756 )
757 end
758
759 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
760 when is_binary(tag_reject) do
761 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
762 end
763
764 defp restrict_embedded_tag_reject_any(query, _), do: query
765
766 defp object_ids_query_for_tags(tags) do
767 from(hto in "hashtags_objects")
768 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
769 |> where([hto, ht], ht.name in ^tags)
770 |> select([hto], hto.object_id)
771 |> distinct([hto], true)
772 end
773
774 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
775 raise_on_missing_preload()
776 end
777
778 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
779 restrict_hashtag_any(query, %{tag: single_tag})
780 end
781
782 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
783 from(
784 [_activity, object] in query,
785 where:
786 fragment(
787 """
788 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
789 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
790 AND hashtags_objects.object_id = ?) @> ?
791 """,
792 ^tags,
793 object.id,
794 ^tags
795 )
796 )
797 end
798
799 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
800 restrict_hashtag_all(query, %{tag_all: [tag]})
801 end
802
803 defp restrict_hashtag_all(query, _), do: query
804
805 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
806 raise_on_missing_preload()
807 end
808
809 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
810 hashtag_ids =
811 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
812 |> Repo.all()
813
814 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
815 from(
816 [_activity, object] in query,
817 join: hto in "hashtags_objects",
818 on: hto.object_id == object.id,
819 where: hto.hashtag_id in ^hashtag_ids,
820 distinct: [desc: object.id],
821 order_by: [desc: object.id]
822 )
823 end
824
825 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
826 restrict_hashtag_any(query, %{tag: [tag]})
827 end
828
829 defp restrict_hashtag_any(query, _), do: query
830
831 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
832 raise_on_missing_preload()
833 end
834
835 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
836 from(
837 [_activity, object] in query,
838 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
839 )
840 end
841
842 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
843 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
844 end
845
846 defp restrict_hashtag_reject_any(query, _), do: query
847
848 defp raise_on_missing_preload do
849 raise "Can't use the child object without preloading!"
850 end
851
852 defp restrict_recipients(query, [], _user), do: query
853
854 defp restrict_recipients(query, recipients, nil) do
855 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
856 end
857
858 defp restrict_recipients(query, recipients, user) do
859 from(
860 activity in query,
861 where: fragment("? && ?", ^recipients, activity.recipients),
862 or_where: activity.actor == ^user.ap_id
863 )
864 end
865
866 defp restrict_local(query, %{local_only: true}) do
867 from(activity in query, where: activity.local == true)
868 end
869
870 defp restrict_local(query, _), do: query
871
872 defp restrict_remote(query, %{remote: true}) do
873 from(activity in query, where: activity.local == false)
874 end
875
876 defp restrict_remote(query, _), do: query
877
878 defp restrict_actor(query, %{actor_id: actor_id}) do
879 from(activity in query, where: activity.actor == ^actor_id)
880 end
881
882 defp restrict_actor(query, _), do: query
883
884 defp restrict_type(query, %{type: type}) when is_binary(type) do
885 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
886 end
887
888 defp restrict_type(query, %{type: type}) do
889 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
890 end
891
892 defp restrict_type(query, _), do: query
893
894 defp restrict_state(query, %{state: state}) do
895 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
896 end
897
898 defp restrict_state(query, _), do: query
899
900 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
901 from(
902 [_activity, object] in query,
903 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
904 )
905 end
906
907 defp restrict_favorited_by(query, _), do: query
908
909 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
910 raise "Can't use the child object without preloading!"
911 end
912
913 defp restrict_media(query, %{only_media: true}) do
914 from(
915 [activity, object] in query,
916 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
917 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
918 )
919 end
920
921 defp restrict_media(query, _), do: query
922
923 defp restrict_replies(query, %{exclude_replies: true}) do
924 from(
925 [_activity, object] in query,
926 where: fragment("?->>'inReplyTo' is null", object.data)
927 )
928 end
929
930 defp restrict_replies(query, %{
931 reply_filtering_user: %User{} = user,
932 reply_visibility: "self"
933 }) do
934 from(
935 [activity, object] in query,
936 where:
937 fragment(
938 "?->>'inReplyTo' is null OR ? = ANY(?)",
939 object.data,
940 ^user.ap_id,
941 activity.recipients
942 )
943 )
944 end
945
946 defp restrict_replies(query, %{
947 reply_filtering_user: %User{} = user,
948 reply_visibility: "following"
949 }) do
950 from(
951 [activity, object] in query,
952 where:
953 fragment(
954 """
955 ?->>'type' != 'Create' -- This isn't a Create
956 OR ?->>'inReplyTo' is null -- this isn't a reply
957 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
958 -- unless they are the author (because authors
959 -- are also part of the recipients). This leads
960 -- to a bug that self-replies by friends won't
961 -- show up.
962 OR ? = ? -- The actor is us
963 """,
964 activity.data,
965 object.data,
966 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
967 activity.recipients,
968 activity.actor,
969 activity.actor,
970 ^user.ap_id
971 )
972 )
973 end
974
975 defp restrict_replies(query, _), do: query
976
977 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
978 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
979 end
980
981 defp restrict_reblogs(query, _), do: query
982
983 defp restrict_muted(query, %{with_muted: true}), do: query
984
985 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
986 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
987
988 query =
989 from([activity] in query,
990 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
991 where:
992 fragment(
993 "not (?->'to' \\?| ?) or ? = ?",
994 activity.data,
995 ^mutes,
996 activity.actor,
997 ^user.ap_id
998 )
999 )
1000
1001 unless opts[:skip_preload] do
1002 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1003 else
1004 query
1005 end
1006 end
1007
1008 defp restrict_muted(query, _), do: query
1009
1010 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1011 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1012 domain_blocks = user.domain_blocks || []
1013
1014 following_ap_ids = User.get_friends_ap_ids(user)
1015
1016 query =
1017 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1018
1019 from(
1020 [activity, object: o] in query,
1021 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1022 where:
1023 fragment(
1024 "((not (? && ?)) or ? = ?)",
1025 activity.recipients,
1026 ^blocked_ap_ids,
1027 activity.actor,
1028 ^user.ap_id
1029 ),
1030 where:
1031 fragment(
1032 "recipients_contain_blocked_domains(?, ?) = false",
1033 activity.recipients,
1034 ^domain_blocks
1035 ),
1036 where:
1037 fragment(
1038 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1039 activity.data,
1040 activity.data,
1041 ^blocked_ap_ids
1042 ),
1043 where:
1044 fragment(
1045 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1046 activity.actor,
1047 ^domain_blocks,
1048 activity.actor,
1049 ^following_ap_ids
1050 ),
1051 where:
1052 fragment(
1053 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1054 o.data,
1055 ^domain_blocks,
1056 o.data,
1057 ^following_ap_ids
1058 )
1059 )
1060 end
1061
1062 defp restrict_blocked(query, _), do: query
1063
1064 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1065 from(
1066 activity in query,
1067 where:
1068 fragment(
1069 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1070 activity.data,
1071 ^[Constants.as_public()]
1072 )
1073 )
1074 end
1075
1076 defp restrict_unlisted(query, _), do: query
1077
1078 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1079 from(activity in query, where: activity.id in ^ids)
1080 end
1081
1082 defp restrict_pinned(query, _), do: query
1083
1084 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1085 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1086
1087 from(
1088 activity in query,
1089 where:
1090 fragment(
1091 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1092 activity.data,
1093 activity.actor,
1094 ^muted_reblogs
1095 )
1096 )
1097 end
1098
1099 defp restrict_muted_reblogs(query, _), do: query
1100
1101 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1102 from(
1103 activity in query,
1104 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1105 )
1106 end
1107
1108 defp restrict_instance(query, _), do: query
1109
1110 defp restrict_filtered(query, %{user: %User{} = user}) do
1111 case Filter.compose_regex(user) do
1112 nil ->
1113 query
1114
1115 regex ->
1116 from([activity, object] in query,
1117 where:
1118 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1119 activity.actor == ^user.ap_id
1120 )
1121 end
1122 end
1123
1124 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1125 restrict_filtered(query, %{user: user})
1126 end
1127
1128 defp restrict_filtered(query, _), do: query
1129
1130 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1131
1132 defp exclude_poll_votes(query, _) do
1133 if has_named_binding?(query, :object) do
1134 from([activity, object: o] in query,
1135 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1136 )
1137 else
1138 query
1139 end
1140 end
1141
1142 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1143
1144 defp exclude_chat_messages(query, _) do
1145 if has_named_binding?(query, :object) do
1146 from([activity, object: o] in query,
1147 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1148 )
1149 else
1150 query
1151 end
1152 end
1153
1154 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1155
1156 defp exclude_invisible_actors(query, _opts) do
1157 invisible_ap_ids =
1158 User.Query.build(%{invisible: true, select: [:ap_id]})
1159 |> Repo.all()
1160 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1161
1162 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1163 end
1164
1165 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1166 from(activity in query, where: activity.id != ^id)
1167 end
1168
1169 defp exclude_id(query, _), do: query
1170
1171 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1172
1173 defp maybe_preload_objects(query, _) do
1174 query
1175 |> Activity.with_preloaded_object()
1176 end
1177
1178 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1179
1180 defp maybe_preload_bookmarks(query, opts) do
1181 query
1182 |> Activity.with_preloaded_bookmark(opts[:user])
1183 end
1184
1185 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1186 query
1187 |> Activity.with_preloaded_report_notes()
1188 end
1189
1190 defp maybe_preload_report_notes(query, _), do: query
1191
1192 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1193
1194 defp maybe_set_thread_muted_field(query, opts) do
1195 query
1196 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1197 end
1198
1199 defp maybe_order(query, %{order: :desc}) do
1200 query
1201 |> order_by(desc: :id)
1202 end
1203
1204 defp maybe_order(query, %{order: :asc}) do
1205 query
1206 |> order_by(asc: :id)
1207 end
1208
1209 defp maybe_order(query, _), do: query
1210
1211 defp normalize_fetch_activities_query_opts(opts) do
1212 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1213 case opts[key] do
1214 value when is_bitstring(value) ->
1215 Map.put(opts, key, Hashtag.normalize_name(value))
1216
1217 value when is_list(value) ->
1218 normalized_value =
1219 value
1220 |> Enum.map(&Hashtag.normalize_name/1)
1221 |> Enum.uniq()
1222
1223 Map.put(opts, key, normalized_value)
1224
1225 _ ->
1226 opts
1227 end
1228 end)
1229 end
1230
1231 defp fetch_activities_query_ap_ids_ops(opts) do
1232 source_user = opts[:muting_user]
1233 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1234
1235 ap_id_relationships =
1236 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1237 [:block | ap_id_relationships]
1238 else
1239 ap_id_relationships
1240 end
1241
1242 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1243
1244 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1245 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1246
1247 restrict_muted_reblogs_opts =
1248 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1249
1250 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1251 end
1252
1253 def fetch_activities_query(recipients, opts \\ %{}) do
1254 opts = normalize_fetch_activities_query_opts(opts)
1255
1256 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1257 fetch_activities_query_ap_ids_ops(opts)
1258
1259 config = %{
1260 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1261 }
1262
1263 query =
1264 Activity
1265 |> maybe_preload_objects(opts)
1266 |> maybe_preload_bookmarks(opts)
1267 |> maybe_preload_report_notes(opts)
1268 |> maybe_set_thread_muted_field(opts)
1269 |> maybe_order(opts)
1270 |> restrict_recipients(recipients, opts[:user])
1271 |> restrict_replies(opts)
1272 |> restrict_since(opts)
1273 |> restrict_local(opts)
1274 |> restrict_remote(opts)
1275 |> restrict_actor(opts)
1276 |> restrict_type(opts)
1277 |> restrict_state(opts)
1278 |> restrict_favorited_by(opts)
1279 |> restrict_blocked(restrict_blocked_opts)
1280 |> restrict_muted(restrict_muted_opts)
1281 |> restrict_filtered(opts)
1282 |> restrict_media(opts)
1283 |> restrict_visibility(opts)
1284 |> restrict_thread_visibility(opts, config)
1285 |> restrict_reblogs(opts)
1286 |> restrict_pinned(opts)
1287 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1288 |> restrict_instance(opts)
1289 |> restrict_announce_object_actor(opts)
1290 |> restrict_filtered(opts)
1291 |> Activity.restrict_deactivated_users()
1292 |> exclude_poll_votes(opts)
1293 |> exclude_chat_messages(opts)
1294 |> exclude_invisible_actors(opts)
1295 |> exclude_visibility(opts)
1296
1297 if Config.feature_enabled?(:improved_hashtag_timeline) do
1298 query
1299 |> restrict_hashtag_any(opts)
1300 |> restrict_hashtag_all(opts)
1301 |> restrict_hashtag_reject_any(opts)
1302 else
1303 query
1304 |> restrict_embedded_tag_any(opts)
1305 |> restrict_embedded_tag_all(opts)
1306 |> restrict_embedded_tag_reject_any(opts)
1307 end
1308 end
1309
1310 @doc """
1311 Fetch favorites activities of user with order by sort adds to favorites
1312 """
1313 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1314 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1315 user.ap_id
1316 |> Activity.Queries.by_actor()
1317 |> Activity.Queries.by_type("Like")
1318 |> Activity.with_joined_object()
1319 |> Object.with_joined_activity()
1320 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1321 |> order_by([like, _, _], desc_nulls_last: like.id)
1322 |> Pagination.fetch_paginated(
1323 Map.merge(params, %{skip_order: true}),
1324 pagination
1325 )
1326 end
1327
1328 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1329 Enum.map(activities, fn
1330 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1331 if Enum.any?(bcc, &(&1 in list_memberships)) do
1332 update_in(activity.data["cc"], &[user_ap_id | &1])
1333 else
1334 activity
1335 end
1336
1337 activity ->
1338 activity
1339 end)
1340 end
1341
1342 defp maybe_update_cc(activities, _, _), do: activities
1343
1344 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1345 from(activity in query,
1346 where:
1347 fragment("? && ?", activity.recipients, ^recipients) or
1348 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1349 ^Constants.as_public() in activity.recipients)
1350 )
1351 end
1352
1353 def fetch_activities_bounded(
1354 recipients,
1355 recipients_with_public,
1356 opts \\ %{},
1357 pagination \\ :keyset
1358 ) do
1359 fetch_activities_query([], opts)
1360 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1361 |> Pagination.fetch_paginated(opts, pagination)
1362 |> Enum.reverse()
1363 end
1364
1365 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1366 def upload(file, opts \\ []) do
1367 with {:ok, data} <- Upload.store(file, opts) do
1368 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1369
1370 Repo.insert(%Object{data: obj_data})
1371 end
1372 end
1373
1374 @spec get_actor_url(any()) :: binary() | nil
1375 defp get_actor_url(url) when is_binary(url), do: url
1376 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1377
1378 defp get_actor_url(url) when is_list(url) do
1379 url
1380 |> List.first()
1381 |> get_actor_url()
1382 end
1383
1384 defp get_actor_url(_url), do: nil
1385
1386 defp normalize_image(%{"url" => url}) do
1387 %{
1388 "type" => "Image",
1389 "url" => [%{"href" => url}]
1390 }
1391 end
1392
1393 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1394 defp normalize_image(_), do: nil
1395
1396 defp object_to_user_data(data) do
1397 fields =
1398 data
1399 |> Map.get("attachment", [])
1400 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1401 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1402
1403 emojis =
1404 data
1405 |> Map.get("tag", [])
1406 |> Enum.filter(fn
1407 %{"type" => "Emoji"} -> true
1408 _ -> false
1409 end)
1410 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1411 {String.trim(name, ":"), url}
1412 end)
1413
1414 is_locked = data["manuallyApprovesFollowers"] || false
1415 capabilities = data["capabilities"] || %{}
1416 accepts_chat_messages = capabilities["acceptsChatMessages"]
1417 data = Transmogrifier.maybe_fix_user_object(data)
1418 is_discoverable = data["discoverable"] || false
1419 invisible = data["invisible"] || false
1420 actor_type = data["type"] || "Person"
1421
1422 public_key =
1423 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1424 data["publicKey"]["publicKeyPem"]
1425 else
1426 nil
1427 end
1428
1429 shared_inbox =
1430 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1431 data["endpoints"]["sharedInbox"]
1432 else
1433 nil
1434 end
1435
1436 user_data = %{
1437 ap_id: data["id"],
1438 uri: get_actor_url(data["url"]),
1439 ap_enabled: true,
1440 banner: normalize_image(data["image"]),
1441 fields: fields,
1442 emoji: emojis,
1443 is_locked: is_locked,
1444 is_discoverable: is_discoverable,
1445 invisible: invisible,
1446 avatar: normalize_image(data["icon"]),
1447 name: data["name"],
1448 follower_address: data["followers"],
1449 following_address: data["following"],
1450 bio: data["summary"] || "",
1451 actor_type: actor_type,
1452 also_known_as: Map.get(data, "alsoKnownAs", []),
1453 public_key: public_key,
1454 inbox: data["inbox"],
1455 shared_inbox: shared_inbox,
1456 accepts_chat_messages: accepts_chat_messages
1457 }
1458
1459 # nickname can be nil because of virtual actors
1460 if data["preferredUsername"] do
1461 Map.put(
1462 user_data,
1463 :nickname,
1464 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1465 )
1466 else
1467 Map.put(user_data, :nickname, nil)
1468 end
1469 end
1470
1471 def fetch_follow_information_for_user(user) do
1472 with {:ok, following_data} <-
1473 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1474 {:ok, hide_follows} <- collection_private(following_data),
1475 {:ok, followers_data} <-
1476 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1477 {:ok, hide_followers} <- collection_private(followers_data) do
1478 {:ok,
1479 %{
1480 hide_follows: hide_follows,
1481 follower_count: normalize_counter(followers_data["totalItems"]),
1482 following_count: normalize_counter(following_data["totalItems"]),
1483 hide_followers: hide_followers
1484 }}
1485 else
1486 {:error, _} = e -> e
1487 e -> {:error, e}
1488 end
1489 end
1490
1491 defp normalize_counter(counter) when is_integer(counter), do: counter
1492 defp normalize_counter(_), do: 0
1493
1494 def maybe_update_follow_information(user_data) do
1495 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1496 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1497 {_, true} <-
1498 {:collections_available,
1499 !!(user_data[:following_address] && user_data[:follower_address])},
1500 {:ok, info} <-
1501 fetch_follow_information_for_user(user_data) do
1502 info = Map.merge(user_data[:info] || %{}, info)
1503
1504 user_data
1505 |> Map.put(:info, info)
1506 else
1507 {:user_type_check, false} ->
1508 user_data
1509
1510 {:collections_available, false} ->
1511 user_data
1512
1513 {:enabled, false} ->
1514 user_data
1515
1516 e ->
1517 Logger.error(
1518 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1519 )
1520
1521 user_data
1522 end
1523 end
1524
1525 defp collection_private(%{"first" => %{"type" => type}})
1526 when type in ["CollectionPage", "OrderedCollectionPage"],
1527 do: {:ok, false}
1528
1529 defp collection_private(%{"first" => first}) do
1530 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1531 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1532 {:ok, false}
1533 else
1534 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1535 {:error, _} = e -> e
1536 e -> {:error, e}
1537 end
1538 end
1539
1540 defp collection_private(_data), do: {:ok, true}
1541
1542 def user_data_from_user_object(data) do
1543 with {:ok, data} <- MRF.filter(data) do
1544 {:ok, object_to_user_data(data)}
1545 else
1546 e -> {:error, e}
1547 end
1548 end
1549
1550 def fetch_and_prepare_user_from_ap_id(ap_id) do
1551 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1552 {:ok, data} <- user_data_from_user_object(data) do
1553 {:ok, maybe_update_follow_information(data)}
1554 else
1555 # If this has been deleted, only log a debug and not an error
1556 {:error, "Object has been deleted" = e} ->
1557 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1558 {:error, e}
1559
1560 {:error, {:reject, reason} = e} ->
1561 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1562 {:error, e}
1563
1564 {:error, e} ->
1565 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1566 {:error, e}
1567 end
1568 end
1569
1570 def maybe_handle_clashing_nickname(data) do
1571 with nickname when is_binary(nickname) <- data[:nickname],
1572 %User{} = old_user <- User.get_by_nickname(nickname),
1573 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1574 Logger.info(
1575 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1576 data[:ap_id]
1577 }, renaming."
1578 )
1579
1580 old_user
1581 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1582 |> User.update_and_set_cache()
1583 else
1584 {:ap_id_comparison, true} ->
1585 Logger.info(
1586 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1587 )
1588
1589 _ ->
1590 nil
1591 end
1592 end
1593
1594 def make_user_from_ap_id(ap_id) do
1595 user = User.get_cached_by_ap_id(ap_id)
1596
1597 if user && !User.ap_enabled?(user) do
1598 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1599 else
1600 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1601 if user do
1602 user
1603 |> User.remote_user_changeset(data)
1604 |> User.update_and_set_cache()
1605 else
1606 maybe_handle_clashing_nickname(data)
1607
1608 data
1609 |> User.remote_user_changeset()
1610 |> Repo.insert()
1611 |> User.set_cache()
1612 end
1613 end
1614 end
1615 end
1616
1617 def make_user_from_nickname(nickname) do
1618 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1619 make_user_from_ap_id(ap_id)
1620 else
1621 _e -> {:error, "No AP id in WebFinger"}
1622 end
1623 end
1624
1625 # filter out broken threads
1626 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1627 entire_thread_visible_for_user?(activity, user)
1628 end
1629
1630 # do post-processing on a specific activity
1631 def contain_activity(%Activity{} = activity, %User{} = user) do
1632 contain_broken_threads(activity, user)
1633 end
1634
1635 def fetch_direct_messages_query do
1636 Activity
1637 |> restrict_type(%{type: "Create"})
1638 |> restrict_visibility(%{visibility: "direct"})
1639 |> order_by([activity], asc: activity.id)
1640 end
1641 end