Pipeline Ingestion: Page
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
53 {recipients, to, cc}
54 end
55
56 defp check_actor_is_active(nil), do: true
57
58 defp check_actor_is_active(actor) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
61 _ -> false
62 end
63 end
64
65 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
66 limit = Config.get([:instance, :remote_limit])
67 String.length(content) <= limit
68 end
69
70 defp check_remote_limit(_), do: true
71
72 def increase_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 end
75
76 def decrease_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 end
79
80 defp increase_replies_count_if_reply(%{
81 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 "type" => "Create"
83 }) do
84 if is_public?(object) do
85 Object.increase_replies_count(reply_ap_id)
86 end
87 end
88
89 defp increase_replies_count_if_reply(_create_data), do: :noop
90
91 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
92 @impl true
93 def persist(%{"type" => type} = object, meta) when type in @object_types do
94 with {:ok, object} <- Object.create(object) do
95 {:ok, object, meta}
96 end
97 end
98
99 @impl true
100 def persist(object, meta) do
101 with local <- Keyword.fetch!(meta, :local),
102 {recipients, _, _} <- get_recipients(object),
103 {:ok, activity} <-
104 Repo.insert(%Activity{
105 data: object,
106 local: local,
107 recipients: recipients,
108 actor: object["actor"]
109 }),
110 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
111 {:ok, _} <- maybe_create_activity_expiration(activity) do
112 {:ok, activity, meta}
113 end
114 end
115
116 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
117 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
118 with nil <- Activity.normalize(map),
119 map <- lazy_put_activity_defaults(map, fake),
120 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
121 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
122 {:ok, map} <- MRF.filter(map),
123 {recipients, _, _} = get_recipients(map),
124 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
125 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
126 {:ok, map, object} <- insert_full_object(map),
127 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
128 # Splice in the child object if we have one.
129 activity = Maps.put_if_present(activity, :object, object)
130
131 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
132 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
133 end)
134
135 {:ok, activity}
136 else
137 %Activity{} = activity ->
138 {:ok, activity}
139
140 {:actor_check, _} ->
141 {:error, false}
142
143 {:containment, _} = error ->
144 error
145
146 {:error, _} = error ->
147 error
148
149 {:fake, true, map, recipients} ->
150 activity = %Activity{
151 data: map,
152 local: local,
153 actor: map["actor"],
154 recipients: recipients,
155 id: "pleroma:fakeid"
156 }
157
158 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:ok, activity}
160
161 {:remote_limit_pass, _} ->
162 {:error, :remote_limit}
163
164 {:reject, _} = e ->
165 {:error, e}
166 end
167 end
168
169 defp insert_activity_with_expiration(data, local, recipients) do
170 struct = %Activity{
171 data: data,
172 local: local,
173 actor: data["actor"],
174 recipients: recipients
175 }
176
177 with {:ok, activity} <- Repo.insert(struct) do
178 maybe_create_activity_expiration(activity)
179 end
180 end
181
182 def notify_and_stream(activity) do
183 Notification.create_notifications(activity)
184
185 conversation = create_or_bump_conversation(activity, activity.actor)
186 participations = get_participations(conversation)
187 stream_out(activity)
188 stream_out_participations(participations)
189 end
190
191 defp maybe_create_activity_expiration(
192 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 ) do
194 with {:ok, _job} <-
195 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
196 activity_id: activity.id,
197 expires_at: expires_at
198 }) do
199 {:ok, activity}
200 end
201 end
202
203 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204
205 defp create_or_bump_conversation(activity, actor) do
206 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
207 %User{} = user <- User.get_cached_by_ap_id(actor) do
208 Participation.mark_as_read(user, conversation)
209 {:ok, conversation}
210 end
211 end
212
213 defp get_participations({:ok, conversation}) do
214 conversation
215 |> Repo.preload(:participations, force: true)
216 |> Map.get(:participations)
217 end
218
219 defp get_participations(_), do: []
220
221 def stream_out_participations(participations) do
222 participations =
223 participations
224 |> Repo.preload(:user)
225
226 Streamer.stream("participation", participations)
227 end
228
229 @impl true
230 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
231 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
232 conversation = Repo.preload(conversation, :participations)
233
234 last_activity_id =
235 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
236 user: user,
237 blocking_user: user
238 })
239
240 if last_activity_id do
241 stream_out_participations(conversation.participations)
242 end
243 end
244 end
245
246 @impl true
247 def stream_out_participations(_, _), do: :noop
248
249 @impl true
250 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
251 when data_type in ["Create", "Announce", "Delete"] do
252 activity
253 |> Topics.get_activity_topics()
254 |> Streamer.stream(activity)
255 end
256
257 @impl true
258 def stream_out(_activity) do
259 :noop
260 end
261
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
265 result
266 end
267 end
268
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
275
276 create_data =
277 make_create_data(
278 %{to: to, actor: actor, published: published, context: context, object: object},
279 additional
280 )
281
282 with {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 _ <- notify_and_stream(activity),
288 :ok <- maybe_federate(activity) do
289 {:ok, activity}
290 else
291 {:quick_insert, true, activity} ->
292 {:ok, activity}
293
294 {:fake, true, activity} ->
295 {:ok, activity}
296
297 {:error, message} ->
298 Repo.rollback(message)
299 end
300 end
301
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
308
309 listen_data =
310 make_listen_data(
311 %{to: to, actor: actor, published: published, context: context, object: object},
312 additional
313 )
314
315 with {:ok, activity} <- insert(listen_data, local),
316 _ <- notify_and_stream(activity),
317 :ok <- maybe_federate(activity) do
318 {:ok, activity}
319 end
320 end
321
322 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
323 {:ok, Activity.t()} | nil | {:error, any()}
324 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
325 with {:ok, result} <-
326 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
327 result
328 end
329 end
330
331 defp do_unfollow(follower, followed, activity_id, local) do
332 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
333 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
334 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
335 {:ok, activity} <- insert(unfollow_data, local),
336 _ <- notify_and_stream(activity),
337 :ok <- maybe_federate(activity) do
338 {:ok, activity}
339 else
340 nil -> nil
341 {:error, error} -> Repo.rollback(error)
342 end
343 end
344
345 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 def flag(params) do
347 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
348 result
349 end
350 end
351
352 defp do_flag(
353 %{
354 actor: actor,
355 context: _context,
356 account: account,
357 statuses: statuses,
358 content: content
359 } = params
360 ) do
361 # only accept false as false value
362 local = !(params[:local] == false)
363 forward = !(params[:forward] == false)
364
365 additional = params[:additional] || %{}
366
367 additional =
368 if forward do
369 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 else
371 Map.merge(additional, %{"to" => [], "cc" => []})
372 end
373
374 with flag_data <- make_flag_data(params, additional),
375 {:ok, activity} <- insert(flag_data, local),
376 {:ok, stripped_activity} <- strip_report_status_data(activity),
377 _ <- notify_and_stream(activity),
378 :ok <-
379 maybe_federate(stripped_activity) do
380 User.all_superusers()
381 |> Enum.filter(fn user -> user.ap_id != actor end)
382 |> Enum.filter(fn user -> not is_nil(user.email) end)
383 |> Enum.each(fn superuser ->
384 superuser
385 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
386 |> Pleroma.Emails.Mailer.deliver_async()
387 end)
388
389 {:ok, activity}
390 else
391 {:error, error} -> Repo.rollback(error)
392 end
393 end
394
395 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
396 def move(%User{} = origin, %User{} = target, local \\ true) do
397 params = %{
398 "type" => "Move",
399 "actor" => origin.ap_id,
400 "object" => origin.ap_id,
401 "target" => target.ap_id
402 }
403
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
408
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
412 })
413
414 {:ok, activity}
415 else
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
417 err -> err
418 end
419 end
420
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
423
424 recipients =
425 if opts[:user],
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 else: public
428
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_recipients(recipients, opts[:user])
435 |> restrict_filtered(opts)
436 |> where(
437 [activity],
438 fragment(
439 "?->>'type' = ? and ?->>'context' = ?",
440 activity.data,
441 "Create",
442 activity.data,
443 ^context
444 )
445 )
446 |> exclude_poll_votes(opts)
447 |> exclude_id(opts)
448 |> order_by([activity], desc: activity.id)
449 end
450
451 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
452 def fetch_activities_for_context(context, opts \\ %{}) do
453 context
454 |> fetch_activities_for_context_query(opts)
455 |> Repo.all()
456 end
457
458 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
459 FlakeId.Ecto.CompatType.t() | nil
460 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
461 context
462 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
463 |> restrict_visibility(%{visibility: "direct"})
464 |> limit(1)
465 |> select([a], a.id)
466 |> Repo.one()
467 end
468
469 defp fetch_paginated_optimized(query, opts, pagination) do
470 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
471 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
472 opts = Map.put(opts, :skip_extra_order, true)
473
474 Pagination.fetch_paginated(query, opts, pagination)
475 end
476
477 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
478 list_memberships = Pleroma.List.memberships(opts[:user])
479
480 fetch_activities_query(recipients ++ list_memberships, opts)
481 |> fetch_paginated_optimized(opts, pagination)
482 |> Enum.reverse()
483 |> maybe_update_cc(list_memberships, opts[:user])
484 end
485
486 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
487 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
488 opts = Map.delete(opts, :user)
489
490 [Constants.as_public()]
491 |> fetch_activities_query(opts)
492 |> restrict_unlisted(opts)
493 |> fetch_paginated_optimized(opts, pagination)
494 end
495
496 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
497 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
498 opts
499 |> Map.put(:restrict_unlisted, true)
500 |> fetch_public_or_unlisted_activities(pagination)
501 end
502
503 @valid_visibilities ~w[direct unlisted public private]
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when is_list(visibility) do
507 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
508 from(
509 a in query,
510 where:
511 fragment(
512 "activity_visibility(?, ?, ?) = ANY (?)",
513 a.actor,
514 a.recipients,
515 a.data,
516 ^visibility
517 )
518 )
519 else
520 Logger.error("Could not restrict visibility to #{visibility}")
521 end
522 end
523
524 defp restrict_visibility(query, %{visibility: visibility})
525 when visibility in @valid_visibilities do
526 from(
527 a in query,
528 where:
529 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
530 )
531 end
532
533 defp restrict_visibility(_query, %{visibility: visibility})
534 when visibility not in @valid_visibilities do
535 Logger.error("Could not restrict visibility to #{visibility}")
536 end
537
538 defp restrict_visibility(query, _visibility), do: query
539
540 defp exclude_visibility(query, %{exclude_visibilities: visibility})
541 when is_list(visibility) do
542 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ANY (?)",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 else
555 Logger.error("Could not exclude visibility to #{visibility}")
556 query
557 end
558 end
559
560 defp exclude_visibility(query, %{exclude_visibilities: visibility})
561 when visibility in @valid_visibilities do
562 from(
563 a in query,
564 where:
565 not fragment(
566 "activity_visibility(?, ?, ?) = ?",
567 a.actor,
568 a.recipients,
569 a.data,
570 ^visibility
571 )
572 )
573 end
574
575 defp exclude_visibility(query, %{exclude_visibilities: visibility})
576 when visibility not in [nil | @valid_visibilities] do
577 Logger.error("Could not exclude visibility to #{visibility}")
578 query
579 end
580
581 defp exclude_visibility(query, _visibility), do: query
582
583 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
584 do: query
585
586 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
587 do: query
588
589 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
590 from(
591 a in query,
592 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
593 )
594 end
595
596 defp restrict_thread_visibility(query, _, _), do: query
597
598 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
599 params =
600 params
601 |> Map.put(:user, reading_user)
602 |> Map.put(:actor_id, user.ap_id)
603
604 %{
605 godmode: params[:godmode],
606 reading_user: reading_user
607 }
608 |> user_activities_recipients()
609 |> fetch_activities(params)
610 |> Enum.reverse()
611 end
612
613 def fetch_user_activities(user, reading_user, params \\ %{})
614
615 def fetch_user_activities(user, reading_user, %{total: true} = params) do
616 result = fetch_activities_for_user(user, reading_user, params)
617
618 Keyword.put(result, :items, Enum.reverse(result[:items]))
619 end
620
621 def fetch_user_activities(user, reading_user, params) do
622 user
623 |> fetch_activities_for_user(reading_user, params)
624 |> Enum.reverse()
625 end
626
627 defp fetch_activities_for_user(user, reading_user, params) do
628 params =
629 params
630 |> Map.put(:type, ["Create", "Announce"])
631 |> Map.put(:user, reading_user)
632 |> Map.put(:actor_id, user.ap_id)
633 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
634
635 params =
636 if User.blocks?(reading_user, user) do
637 params
638 else
639 params
640 |> Map.put(:blocking_user, reading_user)
641 |> Map.put(:muting_user, reading_user)
642 end
643
644 pagination_type = Map.get(params, :pagination_type) || :keyset
645
646 %{
647 godmode: params[:godmode],
648 reading_user: reading_user
649 }
650 |> user_activities_recipients()
651 |> fetch_activities(params, pagination_type)
652 end
653
654 def fetch_statuses(reading_user, %{total: true} = params) do
655 result = fetch_activities_for_reading_user(reading_user, params)
656 Keyword.put(result, :items, Enum.reverse(result[:items]))
657 end
658
659 def fetch_statuses(reading_user, params) do
660 reading_user
661 |> fetch_activities_for_reading_user(params)
662 |> Enum.reverse()
663 end
664
665 defp fetch_activities_for_reading_user(reading_user, params) do
666 params = Map.put(params, :type, ["Create", "Announce"])
667
668 %{
669 godmode: params[:godmode],
670 reading_user: reading_user
671 }
672 |> user_activities_recipients()
673 |> fetch_activities(params, :offset)
674 end
675
676 defp user_activities_recipients(%{godmode: true}), do: []
677
678 defp user_activities_recipients(%{reading_user: reading_user}) do
679 if reading_user do
680 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
681 else
682 [Constants.as_public()]
683 end
684 end
685
686 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
688 end
689
690 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
691 from(
692 [activity, object] in query,
693 where:
694 fragment(
695 "?->>'type' != ? or ?->>'actor' != ?",
696 activity.data,
697 "Announce",
698 object.data,
699 ^actor
700 )
701 )
702 end
703
704 defp restrict_announce_object_actor(query, _), do: query
705
706 defp restrict_since(query, %{since_id: ""}), do: query
707
708 defp restrict_since(query, %{since_id: since_id}) do
709 from(activity in query, where: activity.id > ^since_id)
710 end
711
712 defp restrict_since(query, _), do: query
713
714 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
715 raise_on_missing_preload()
716 end
717
718 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
719 from(
720 [_activity, object] in query,
721 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
722 )
723 end
724
725 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
726 restrict_embedded_tag_any(query, %{tag: tag})
727 end
728
729 defp restrict_embedded_tag_all(query, _), do: query
730
731 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
732 raise_on_missing_preload()
733 end
734
735 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
736 from(
737 [_activity, object] in query,
738 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
739 )
740 end
741
742 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
743 restrict_embedded_tag_any(query, %{tag: [tag]})
744 end
745
746 defp restrict_embedded_tag_any(query, _), do: query
747
748 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
749 raise_on_missing_preload()
750 end
751
752 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
753 from(
754 [_activity, object] in query,
755 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
756 )
757 end
758
759 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
760 when is_binary(tag_reject) do
761 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
762 end
763
764 defp restrict_embedded_tag_reject_any(query, _), do: query
765
766 defp object_ids_query_for_tags(tags) do
767 from(hto in "hashtags_objects")
768 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
769 |> where([hto, ht], ht.name in ^tags)
770 |> select([hto], hto.object_id)
771 |> distinct([hto], true)
772 end
773
774 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
775 raise_on_missing_preload()
776 end
777
778 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
779 restrict_hashtag_any(query, %{tag: single_tag})
780 end
781
782 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
783 from(
784 [_activity, object] in query,
785 where:
786 fragment(
787 """
788 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
789 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
790 AND hashtags_objects.object_id = ?) @> ?
791 """,
792 ^tags,
793 object.id,
794 ^tags
795 )
796 )
797 end
798
799 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
800 restrict_hashtag_all(query, %{tag_all: [tag]})
801 end
802
803 defp restrict_hashtag_all(query, _), do: query
804
805 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
806 raise_on_missing_preload()
807 end
808
809 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
810 hashtag_ids =
811 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
812 |> Repo.all()
813
814 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
815 from(
816 [_activity, object] in query,
817 join: hto in "hashtags_objects",
818 on: hto.object_id == object.id,
819 where: hto.hashtag_id in ^hashtag_ids,
820 distinct: [desc: object.id],
821 order_by: [desc: object.id]
822 )
823 end
824
825 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
826 restrict_hashtag_any(query, %{tag: [tag]})
827 end
828
829 defp restrict_hashtag_any(query, _), do: query
830
831 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
832 raise_on_missing_preload()
833 end
834
835 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
836 from(
837 [_activity, object] in query,
838 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
839 )
840 end
841
842 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
843 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
844 end
845
846 defp restrict_hashtag_reject_any(query, _), do: query
847
848 defp raise_on_missing_preload do
849 raise "Can't use the child object without preloading!"
850 end
851
852 defp restrict_recipients(query, [], _user), do: query
853
854 defp restrict_recipients(query, recipients, nil) do
855 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
856 end
857
858 defp restrict_recipients(query, recipients, user) do
859 from(
860 activity in query,
861 where: fragment("? && ?", ^recipients, activity.recipients),
862 or_where: activity.actor == ^user.ap_id
863 )
864 end
865
866 defp restrict_local(query, %{local_only: true}) do
867 from(activity in query, where: activity.local == true)
868 end
869
870 defp restrict_local(query, _), do: query
871
872 defp restrict_remote(query, %{remote: true}) do
873 from(activity in query, where: activity.local == false)
874 end
875
876 defp restrict_remote(query, _), do: query
877
878 defp restrict_actor(query, %{actor_id: actor_id}) do
879 from(activity in query, where: activity.actor == ^actor_id)
880 end
881
882 defp restrict_actor(query, _), do: query
883
884 defp restrict_type(query, %{type: type}) when is_binary(type) do
885 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
886 end
887
888 defp restrict_type(query, %{type: type}) do
889 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
890 end
891
892 defp restrict_type(query, _), do: query
893
894 defp restrict_state(query, %{state: state}) do
895 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
896 end
897
898 defp restrict_state(query, _), do: query
899
900 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
901 from(
902 [_activity, object] in query,
903 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
904 )
905 end
906
907 defp restrict_favorited_by(query, _), do: query
908
909 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
910 raise "Can't use the child object without preloading!"
911 end
912
913 defp restrict_media(query, %{only_media: true}) do
914 from(
915 [activity, object] in query,
916 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
917 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
918 )
919 end
920
921 defp restrict_media(query, _), do: query
922
923 defp restrict_replies(query, %{exclude_replies: true}) do
924 from(
925 [_activity, object] in query,
926 where: fragment("?->>'inReplyTo' is null", object.data)
927 )
928 end
929
930 defp restrict_replies(query, %{
931 reply_filtering_user: %User{} = user,
932 reply_visibility: "self"
933 }) do
934 from(
935 [activity, object] in query,
936 where:
937 fragment(
938 "?->>'inReplyTo' is null OR ? = ANY(?)",
939 object.data,
940 ^user.ap_id,
941 activity.recipients
942 )
943 )
944 end
945
946 defp restrict_replies(query, %{
947 reply_filtering_user: %User{} = user,
948 reply_visibility: "following"
949 }) do
950 from(
951 [activity, object] in query,
952 where:
953 fragment(
954 """
955 ?->>'type' != 'Create' -- This isn't a Create
956 OR ?->>'inReplyTo' is null -- this isn't a reply
957 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
958 -- unless they are the author (because authors
959 -- are also part of the recipients). This leads
960 -- to a bug that self-replies by friends won't
961 -- show up.
962 OR ? = ? -- The actor is us
963 """,
964 activity.data,
965 object.data,
966 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
967 activity.recipients,
968 activity.actor,
969 activity.actor,
970 ^user.ap_id
971 )
972 )
973 end
974
975 defp restrict_replies(query, _), do: query
976
977 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
978 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
979 end
980
981 defp restrict_reblogs(query, _), do: query
982
983 defp restrict_muted(query, %{with_muted: true}), do: query
984
985 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
986 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
987
988 query =
989 from([activity] in query,
990 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
991 where:
992 fragment(
993 "not (?->'to' \\?| ?) or ? = ?",
994 activity.data,
995 ^mutes,
996 activity.actor,
997 ^user.ap_id
998 )
999 )
1000
1001 unless opts[:skip_preload] do
1002 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1003 else
1004 query
1005 end
1006 end
1007
1008 defp restrict_muted(query, _), do: query
1009
1010 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1011 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1012 domain_blocks = user.domain_blocks || []
1013
1014 following_ap_ids = User.get_friends_ap_ids(user)
1015
1016 query =
1017 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1018
1019 from(
1020 [activity, object: o] in query,
1021 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1022 where:
1023 fragment(
1024 "((not (? && ?)) or ? = ?)",
1025 activity.recipients,
1026 ^blocked_ap_ids,
1027 activity.actor,
1028 ^user.ap_id
1029 ),
1030 where:
1031 fragment(
1032 "recipients_contain_blocked_domains(?, ?) = false",
1033 activity.recipients,
1034 ^domain_blocks
1035 ),
1036 where:
1037 fragment(
1038 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1039 activity.data,
1040 activity.data,
1041 ^blocked_ap_ids
1042 ),
1043 where:
1044 fragment(
1045 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1046 activity.actor,
1047 ^domain_blocks,
1048 activity.actor,
1049 ^following_ap_ids
1050 ),
1051 where:
1052 fragment(
1053 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1054 o.data,
1055 ^domain_blocks,
1056 o.data,
1057 ^following_ap_ids
1058 )
1059 )
1060 end
1061
1062 defp restrict_blocked(query, _), do: query
1063
1064 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1065 from(
1066 activity in query,
1067 where:
1068 fragment(
1069 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1070 activity.data,
1071 ^[Constants.as_public()]
1072 )
1073 )
1074 end
1075
1076 defp restrict_unlisted(query, _), do: query
1077
1078 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1079 from(
1080 [activity, object: o] in query,
1081 where:
1082 fragment(
1083 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1084 activity.data,
1085 activity.data,
1086 activity.data,
1087 ^ids
1088 )
1089 )
1090 end
1091
1092 defp restrict_pinned(query, _), do: query
1093
1094 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1095 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1096
1097 from(
1098 activity in query,
1099 where:
1100 fragment(
1101 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1102 activity.data,
1103 activity.actor,
1104 ^muted_reblogs
1105 )
1106 )
1107 end
1108
1109 defp restrict_muted_reblogs(query, _), do: query
1110
1111 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1112 from(
1113 activity in query,
1114 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1115 )
1116 end
1117
1118 defp restrict_instance(query, _), do: query
1119
1120 defp restrict_filtered(query, %{user: %User{} = user}) do
1121 case Filter.compose_regex(user) do
1122 nil ->
1123 query
1124
1125 regex ->
1126 from([activity, object] in query,
1127 where:
1128 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1129 activity.actor == ^user.ap_id
1130 )
1131 end
1132 end
1133
1134 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1135 restrict_filtered(query, %{user: user})
1136 end
1137
1138 defp restrict_filtered(query, _), do: query
1139
1140 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1141
1142 defp exclude_poll_votes(query, _) do
1143 if has_named_binding?(query, :object) do
1144 from([activity, object: o] in query,
1145 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1146 )
1147 else
1148 query
1149 end
1150 end
1151
1152 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1153
1154 defp exclude_chat_messages(query, _) do
1155 if has_named_binding?(query, :object) do
1156 from([activity, object: o] in query,
1157 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1158 )
1159 else
1160 query
1161 end
1162 end
1163
1164 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1165
1166 defp exclude_invisible_actors(query, _opts) do
1167 invisible_ap_ids =
1168 User.Query.build(%{invisible: true, select: [:ap_id]})
1169 |> Repo.all()
1170 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1171
1172 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1173 end
1174
1175 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1176 from(activity in query, where: activity.id != ^id)
1177 end
1178
1179 defp exclude_id(query, _), do: query
1180
1181 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1182
1183 defp maybe_preload_objects(query, _) do
1184 query
1185 |> Activity.with_preloaded_object()
1186 end
1187
1188 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1189
1190 defp maybe_preload_bookmarks(query, opts) do
1191 query
1192 |> Activity.with_preloaded_bookmark(opts[:user])
1193 end
1194
1195 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1196 query
1197 |> Activity.with_preloaded_report_notes()
1198 end
1199
1200 defp maybe_preload_report_notes(query, _), do: query
1201
1202 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1203
1204 defp maybe_set_thread_muted_field(query, opts) do
1205 query
1206 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1207 end
1208
1209 defp maybe_order(query, %{order: :desc}) do
1210 query
1211 |> order_by(desc: :id)
1212 end
1213
1214 defp maybe_order(query, %{order: :asc}) do
1215 query
1216 |> order_by(asc: :id)
1217 end
1218
1219 defp maybe_order(query, _), do: query
1220
1221 defp normalize_fetch_activities_query_opts(opts) do
1222 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1223 case opts[key] do
1224 value when is_bitstring(value) ->
1225 Map.put(opts, key, Hashtag.normalize_name(value))
1226
1227 value when is_list(value) ->
1228 normalized_value =
1229 value
1230 |> Enum.map(&Hashtag.normalize_name/1)
1231 |> Enum.uniq()
1232
1233 Map.put(opts, key, normalized_value)
1234
1235 _ ->
1236 opts
1237 end
1238 end)
1239 end
1240
1241 defp fetch_activities_query_ap_ids_ops(opts) do
1242 source_user = opts[:muting_user]
1243 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1244
1245 ap_id_relationships =
1246 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1247 [:block | ap_id_relationships]
1248 else
1249 ap_id_relationships
1250 end
1251
1252 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1253
1254 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1255 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1256
1257 restrict_muted_reblogs_opts =
1258 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1259
1260 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1261 end
1262
1263 def fetch_activities_query(recipients, opts \\ %{}) do
1264 opts = normalize_fetch_activities_query_opts(opts)
1265
1266 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1267 fetch_activities_query_ap_ids_ops(opts)
1268
1269 config = %{
1270 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1271 }
1272
1273 query =
1274 Activity
1275 |> maybe_preload_objects(opts)
1276 |> maybe_preload_bookmarks(opts)
1277 |> maybe_preload_report_notes(opts)
1278 |> maybe_set_thread_muted_field(opts)
1279 |> maybe_order(opts)
1280 |> restrict_recipients(recipients, opts[:user])
1281 |> restrict_replies(opts)
1282 |> restrict_since(opts)
1283 |> restrict_local(opts)
1284 |> restrict_remote(opts)
1285 |> restrict_actor(opts)
1286 |> restrict_type(opts)
1287 |> restrict_state(opts)
1288 |> restrict_favorited_by(opts)
1289 |> restrict_blocked(restrict_blocked_opts)
1290 |> restrict_muted(restrict_muted_opts)
1291 |> restrict_filtered(opts)
1292 |> restrict_media(opts)
1293 |> restrict_visibility(opts)
1294 |> restrict_thread_visibility(opts, config)
1295 |> restrict_reblogs(opts)
1296 |> restrict_pinned(opts)
1297 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1298 |> restrict_instance(opts)
1299 |> restrict_announce_object_actor(opts)
1300 |> restrict_filtered(opts)
1301 |> Activity.restrict_deactivated_users()
1302 |> exclude_poll_votes(opts)
1303 |> exclude_chat_messages(opts)
1304 |> exclude_invisible_actors(opts)
1305 |> exclude_visibility(opts)
1306
1307 if Config.feature_enabled?(:improved_hashtag_timeline) do
1308 query
1309 |> restrict_hashtag_any(opts)
1310 |> restrict_hashtag_all(opts)
1311 |> restrict_hashtag_reject_any(opts)
1312 else
1313 query
1314 |> restrict_embedded_tag_any(opts)
1315 |> restrict_embedded_tag_all(opts)
1316 |> restrict_embedded_tag_reject_any(opts)
1317 end
1318 end
1319
1320 @doc """
1321 Fetch favorites activities of user with order by sort adds to favorites
1322 """
1323 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1324 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1325 user.ap_id
1326 |> Activity.Queries.by_actor()
1327 |> Activity.Queries.by_type("Like")
1328 |> Activity.with_joined_object()
1329 |> Object.with_joined_activity()
1330 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1331 |> order_by([like, _, _], desc_nulls_last: like.id)
1332 |> Pagination.fetch_paginated(
1333 Map.merge(params, %{skip_order: true}),
1334 pagination
1335 )
1336 end
1337
1338 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1339 Enum.map(activities, fn
1340 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1341 if Enum.any?(bcc, &(&1 in list_memberships)) do
1342 update_in(activity.data["cc"], &[user_ap_id | &1])
1343 else
1344 activity
1345 end
1346
1347 activity ->
1348 activity
1349 end)
1350 end
1351
1352 defp maybe_update_cc(activities, _, _), do: activities
1353
1354 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1355 from(activity in query,
1356 where:
1357 fragment("? && ?", activity.recipients, ^recipients) or
1358 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1359 ^Constants.as_public() in activity.recipients)
1360 )
1361 end
1362
1363 def fetch_activities_bounded(
1364 recipients,
1365 recipients_with_public,
1366 opts \\ %{},
1367 pagination \\ :keyset
1368 ) do
1369 fetch_activities_query([], opts)
1370 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1371 |> Pagination.fetch_paginated(opts, pagination)
1372 |> Enum.reverse()
1373 end
1374
1375 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1376 def upload(file, opts \\ []) do
1377 with {:ok, data} <- Upload.store(file, opts) do
1378 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1379
1380 Repo.insert(%Object{data: obj_data})
1381 end
1382 end
1383
1384 @spec get_actor_url(any()) :: binary() | nil
1385 defp get_actor_url(url) when is_binary(url), do: url
1386 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1387
1388 defp get_actor_url(url) when is_list(url) do
1389 url
1390 |> List.first()
1391 |> get_actor_url()
1392 end
1393
1394 defp get_actor_url(_url), do: nil
1395
1396 defp normalize_image(%{"url" => url}) do
1397 %{
1398 "type" => "Image",
1399 "url" => [%{"href" => url}]
1400 }
1401 end
1402
1403 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1404 defp normalize_image(_), do: nil
1405
1406 defp object_to_user_data(data) do
1407 fields =
1408 data
1409 |> Map.get("attachment", [])
1410 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1411 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1412
1413 emojis =
1414 data
1415 |> Map.get("tag", [])
1416 |> Enum.filter(fn
1417 %{"type" => "Emoji"} -> true
1418 _ -> false
1419 end)
1420 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1421 {String.trim(name, ":"), url}
1422 end)
1423
1424 is_locked = data["manuallyApprovesFollowers"] || false
1425 capabilities = data["capabilities"] || %{}
1426 accepts_chat_messages = capabilities["acceptsChatMessages"]
1427 data = Transmogrifier.maybe_fix_user_object(data)
1428 is_discoverable = data["discoverable"] || false
1429 invisible = data["invisible"] || false
1430 actor_type = data["type"] || "Person"
1431
1432 featured_address = data["featured"]
1433 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1434
1435 public_key =
1436 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1437 data["publicKey"]["publicKeyPem"]
1438 else
1439 nil
1440 end
1441
1442 shared_inbox =
1443 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1444 data["endpoints"]["sharedInbox"]
1445 else
1446 nil
1447 end
1448
1449 user_data = %{
1450 ap_id: data["id"],
1451 uri: get_actor_url(data["url"]),
1452 ap_enabled: true,
1453 banner: normalize_image(data["image"]),
1454 fields: fields,
1455 emoji: emojis,
1456 is_locked: is_locked,
1457 is_discoverable: is_discoverable,
1458 invisible: invisible,
1459 avatar: normalize_image(data["icon"]),
1460 name: data["name"],
1461 follower_address: data["followers"],
1462 following_address: data["following"],
1463 featured_address: featured_address,
1464 bio: data["summary"] || "",
1465 actor_type: actor_type,
1466 also_known_as: Map.get(data, "alsoKnownAs", []),
1467 public_key: public_key,
1468 inbox: data["inbox"],
1469 shared_inbox: shared_inbox,
1470 accepts_chat_messages: accepts_chat_messages,
1471 pinned_objects: pinned_objects
1472 }
1473
1474 # nickname can be nil because of virtual actors
1475 if data["preferredUsername"] do
1476 Map.put(
1477 user_data,
1478 :nickname,
1479 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1480 )
1481 else
1482 Map.put(user_data, :nickname, nil)
1483 end
1484 end
1485
1486 def fetch_follow_information_for_user(user) do
1487 with {:ok, following_data} <-
1488 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1489 {:ok, hide_follows} <- collection_private(following_data),
1490 {:ok, followers_data} <-
1491 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1492 {:ok, hide_followers} <- collection_private(followers_data) do
1493 {:ok,
1494 %{
1495 hide_follows: hide_follows,
1496 follower_count: normalize_counter(followers_data["totalItems"]),
1497 following_count: normalize_counter(following_data["totalItems"]),
1498 hide_followers: hide_followers
1499 }}
1500 else
1501 {:error, _} = e -> e
1502 e -> {:error, e}
1503 end
1504 end
1505
1506 defp normalize_counter(counter) when is_integer(counter), do: counter
1507 defp normalize_counter(_), do: 0
1508
1509 def maybe_update_follow_information(user_data) do
1510 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1511 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1512 {_, true} <-
1513 {:collections_available,
1514 !!(user_data[:following_address] && user_data[:follower_address])},
1515 {:ok, info} <-
1516 fetch_follow_information_for_user(user_data) do
1517 info = Map.merge(user_data[:info] || %{}, info)
1518
1519 user_data
1520 |> Map.put(:info, info)
1521 else
1522 {:user_type_check, false} ->
1523 user_data
1524
1525 {:collections_available, false} ->
1526 user_data
1527
1528 {:enabled, false} ->
1529 user_data
1530
1531 e ->
1532 Logger.error(
1533 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1534 )
1535
1536 user_data
1537 end
1538 end
1539
1540 defp collection_private(%{"first" => %{"type" => type}})
1541 when type in ["CollectionPage", "OrderedCollectionPage"],
1542 do: {:ok, false}
1543
1544 defp collection_private(%{"first" => first}) do
1545 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1546 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1547 {:ok, false}
1548 else
1549 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1550 {:error, _} = e -> e
1551 e -> {:error, e}
1552 end
1553 end
1554
1555 defp collection_private(_data), do: {:ok, true}
1556
1557 def user_data_from_user_object(data) do
1558 with {:ok, data} <- MRF.filter(data) do
1559 {:ok, object_to_user_data(data)}
1560 else
1561 e -> {:error, e}
1562 end
1563 end
1564
1565 def fetch_and_prepare_user_from_ap_id(ap_id) do
1566 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1567 {:ok, data} <- user_data_from_user_object(data) do
1568 {:ok, maybe_update_follow_information(data)}
1569 else
1570 # If this has been deleted, only log a debug and not an error
1571 {:error, "Object has been deleted" = e} ->
1572 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1573 {:error, e}
1574
1575 {:error, {:reject, reason} = e} ->
1576 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1577 {:error, e}
1578
1579 {:error, e} ->
1580 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1581 {:error, e}
1582 end
1583 end
1584
1585 def maybe_handle_clashing_nickname(data) do
1586 with nickname when is_binary(nickname) <- data[:nickname],
1587 %User{} = old_user <- User.get_by_nickname(nickname),
1588 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1589 Logger.info(
1590 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1591 data[:ap_id]
1592 }, renaming."
1593 )
1594
1595 old_user
1596 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1597 |> User.update_and_set_cache()
1598 else
1599 {:ap_id_comparison, true} ->
1600 Logger.info(
1601 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1602 )
1603
1604 _ ->
1605 nil
1606 end
1607 end
1608
1609 def pin_data_from_featured_collection(%{
1610 "type" => type,
1611 "orderedItems" => objects
1612 })
1613 when type in ["OrderedCollection", "Collection"] do
1614 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1615 end
1616
1617 def fetch_and_prepare_featured_from_ap_id(nil) do
1618 {:ok, %{}}
1619 end
1620
1621 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1622 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1623 {:ok, pin_data_from_featured_collection(data)}
1624 else
1625 e ->
1626 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1627 {:ok, %{}}
1628 end
1629 end
1630
1631 def pinned_fetch_task(nil), do: nil
1632
1633 def pinned_fetch_task(%{pinned_objects: pins}) do
1634 if Enum.all?(pins, fn {ap_id, _} ->
1635 Object.get_cached_by_ap_id(ap_id) ||
1636 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1637 end) do
1638 :ok
1639 else
1640 :error
1641 end
1642 end
1643
1644 def make_user_from_ap_id(ap_id) do
1645 user = User.get_cached_by_ap_id(ap_id)
1646
1647 if user && !User.ap_enabled?(user) do
1648 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1649 else
1650 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1651 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1652
1653 if user do
1654 user
1655 |> User.remote_user_changeset(data)
1656 |> User.update_and_set_cache()
1657 else
1658 maybe_handle_clashing_nickname(data)
1659
1660 data
1661 |> User.remote_user_changeset()
1662 |> Repo.insert()
1663 |> User.set_cache()
1664 end
1665 end
1666 end
1667 end
1668
1669 def make_user_from_nickname(nickname) do
1670 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1671 make_user_from_ap_id(ap_id)
1672 else
1673 _e -> {:error, "No AP id in WebFinger"}
1674 end
1675 end
1676
1677 # filter out broken threads
1678 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1679 entire_thread_visible_for_user?(activity, user)
1680 end
1681
1682 # do post-processing on a specific activity
1683 def contain_activity(%Activity{} = activity, %User{} = user) do
1684 contain_broken_threads(activity, user)
1685 end
1686
1687 def fetch_direct_messages_query do
1688 Activity
1689 |> restrict_type(%{type: "Create"})
1690 |> restrict_visibility(%{visibility: "direct"})
1691 |> order_by([activity], asc: activity.id)
1692 end
1693 end