19961a4a55ea5b9370922d1df63ca0b9330fe00a
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
29
30 import Ecto.Query
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
33
34 require Logger
35 require Pleroma.Constants
36
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
54 {recipients, to, cc}
55 end
56
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
63 _ -> false
64 end
65 end
66
67 defp check_actor_can_insert(_), do: true
68
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
72 end
73
74 defp check_remote_limit(_), do: true
75
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78 end
79
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82 end
83
84 defp increase_replies_count_if_reply(%{
85 "object" => %{"inReplyTo" => reply_ap_id} = object,
86 "type" => "Create"
87 }) do
88 if is_public?(object) do
89 Object.increase_replies_count(reply_ap_id)
90 end
91 end
92
93 defp increase_replies_count_if_reply(_create_data), do: :noop
94
95 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
96 @impl true
97 def persist(%{"type" => type} = object, meta) when type in @object_types do
98 with {:ok, object} <- Object.create(object) do
99 {:ok, object, meta}
100 end
101 end
102
103 @impl true
104 def persist(object, meta) do
105 with local <- Keyword.fetch!(meta, :local),
106 {recipients, _, _} <- get_recipients(object),
107 {:ok, activity} <-
108 Repo.insert(%Activity{
109 data: object,
110 local: local,
111 recipients: recipients,
112 actor: object["actor"]
113 }),
114 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
115 {:ok, _} <- maybe_create_activity_expiration(activity) do
116 {:ok, activity, meta}
117 end
118 end
119
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
125 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map),
131 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
132 # Splice in the child object if we have one.
133 activity = Maps.put_if_present(activity, :object, object)
134
135 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
136 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
137 end)
138
139 {:ok, activity}
140 else
141 %Activity{} = activity ->
142 {:ok, activity}
143
144 {:actor_check, _} ->
145 {:error, false}
146
147 {:containment, _} = error ->
148 error
149
150 {:error, _} = error ->
151 error
152
153 {:fake, true, map, recipients} ->
154 activity = %Activity{
155 data: map,
156 local: local,
157 actor: map["actor"],
158 recipients: recipients,
159 id: "pleroma:fakeid"
160 }
161
162 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
163 {:ok, activity}
164
165 {:remote_limit_pass, _} ->
166 {:error, :remote_limit}
167
168 {:reject, _} = e ->
169 {:error, e}
170 end
171 end
172
173 defp insert_activity_with_expiration(data, local, recipients) do
174 struct = %Activity{
175 data: data,
176 local: local,
177 actor: data["actor"],
178 recipients: recipients
179 }
180
181 with {:ok, activity} <- Repo.insert(struct) do
182 maybe_create_activity_expiration(activity)
183 end
184 end
185
186 def notify_and_stream(activity) do
187 Notification.create_notifications(activity)
188
189 conversation = create_or_bump_conversation(activity, activity.actor)
190 participations = get_participations(conversation)
191 stream_out(activity)
192 stream_out_participations(participations)
193 end
194
195 defp maybe_create_activity_expiration(
196 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
197 ) do
198 with {:ok, _job} <-
199 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
200 activity_id: activity.id,
201 expires_at: expires_at
202 }) do
203 {:ok, activity}
204 end
205 end
206
207 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
208
209 defp create_or_bump_conversation(activity, actor) do
210 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
211 %User{} = user <- User.get_cached_by_ap_id(actor) do
212 Participation.mark_as_read(user, conversation)
213 {:ok, conversation}
214 end
215 end
216
217 defp get_participations({:ok, conversation}) do
218 conversation
219 |> Repo.preload(:participations, force: true)
220 |> Map.get(:participations)
221 end
222
223 defp get_participations(_), do: []
224
225 def stream_out_participations(participations) do
226 participations =
227 participations
228 |> Repo.preload(:user)
229
230 Streamer.stream("participation", participations)
231 end
232
233 @impl true
234 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
235 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
236 conversation = Repo.preload(conversation, :participations)
237
238 last_activity_id =
239 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
240 user: user,
241 blocking_user: user
242 })
243
244 if last_activity_id do
245 stream_out_participations(conversation.participations)
246 end
247 end
248 end
249
250 @impl true
251 def stream_out_participations(_, _), do: :noop
252
253 @impl true
254 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
255 when data_type in ["Create", "Announce", "Delete"] do
256 activity
257 |> Topics.get_activity_topics()
258 |> Streamer.stream(activity)
259 end
260
261 @impl true
262 def stream_out(_activity) do
263 :noop
264 end
265
266 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
267 def create(params, fake \\ false) do
268 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
269 result
270 end
271 end
272
273 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
274 additional = params[:additional] || %{}
275 # only accept false as false value
276 local = !(params[:local] == false)
277 published = params[:published]
278 quick_insert? = Config.get([:env]) == :benchmark
279
280 create_data =
281 make_create_data(
282 %{to: to, actor: actor, published: published, context: context, object: object},
283 additional
284 )
285
286 with {:ok, activity} <- insert(create_data, local, fake),
287 {:fake, false, activity} <- {:fake, fake, activity},
288 _ <- increase_replies_count_if_reply(create_data),
289 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
290 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
291 _ <- notify_and_stream(activity),
292 :ok <- maybe_schedule_poll_notifications(activity),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 else
296 {:quick_insert, true, activity} ->
297 {:ok, activity}
298
299 {:fake, true, activity} ->
300 {:ok, activity}
301
302 {:error, message} ->
303 Repo.rollback(message)
304 end
305 end
306
307 defp maybe_schedule_poll_notifications(activity) do
308 PollWorker.schedule_poll_end(activity)
309 :ok
310 end
311
312 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
313 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
314 additional = params[:additional] || %{}
315 # only accept false as false value
316 local = !(params[:local] == false)
317 published = params[:published]
318
319 listen_data =
320 make_listen_data(
321 %{to: to, actor: actor, published: published, context: context, object: object},
322 additional
323 )
324
325 with {:ok, activity} <- insert(listen_data, local),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_federate(activity) do
328 {:ok, activity}
329 end
330 end
331
332 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
333 {:ok, Activity.t()} | nil | {:error, any()}
334 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
335 with {:ok, result} <-
336 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
337 result
338 end
339 end
340
341 defp do_unfollow(follower, followed, activity_id, local) do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
344 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
345 {:ok, activity} <- insert(unfollow_data, local),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
348 {:ok, activity}
349 else
350 nil -> nil
351 {:error, error} -> Repo.rollback(error)
352 end
353 end
354
355 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
356 def flag(params) do
357 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
358 result
359 end
360 end
361
362 defp do_flag(
363 %{
364 actor: actor,
365 context: _context,
366 account: account,
367 statuses: statuses,
368 content: content
369 } = params
370 ) do
371 # only accept false as false value
372 local = !(params[:local] == false)
373 forward = !(params[:forward] == false)
374
375 additional = params[:additional] || %{}
376
377 additional =
378 if forward do
379 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
380 else
381 Map.merge(additional, %{"to" => [], "cc" => []})
382 end
383
384 with flag_data <- make_flag_data(params, additional),
385 {:ok, activity} <- insert(flag_data, local),
386 {:ok, stripped_activity} <- strip_report_status_data(activity),
387 _ <- notify_and_stream(activity),
388 :ok <-
389 maybe_federate(stripped_activity) do
390 User.all_superusers()
391 |> Enum.filter(fn user -> user.ap_id != actor end)
392 |> Enum.filter(fn user -> not is_nil(user.email) end)
393 |> Enum.each(fn superuser ->
394 superuser
395 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
396 |> Pleroma.Emails.Mailer.deliver_async()
397 end)
398
399 {:ok, activity}
400 else
401 {:error, error} -> Repo.rollback(error)
402 end
403 end
404
405 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
406 def move(%User{} = origin, %User{} = target, local \\ true) do
407 params = %{
408 "type" => "Move",
409 "actor" => origin.ap_id,
410 "object" => origin.ap_id,
411 "target" => target.ap_id
412 }
413
414 with true <- origin.ap_id in target.also_known_as,
415 {:ok, activity} <- insert(params, local),
416 _ <- notify_and_stream(activity) do
417 maybe_federate(activity)
418
419 BackgroundWorker.enqueue("move_following", %{
420 "origin_id" => origin.id,
421 "target_id" => target.id
422 })
423
424 {:ok, activity}
425 else
426 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
427 err -> err
428 end
429 end
430
431 def fetch_activities_for_context_query(context, opts) do
432 public = [Constants.as_public()]
433
434 recipients =
435 if opts[:user],
436 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
437 else: public
438
439 from(activity in Activity)
440 |> maybe_preload_objects(opts)
441 |> maybe_preload_bookmarks(opts)
442 |> maybe_set_thread_muted_field(opts)
443 |> restrict_blocked(opts)
444 |> restrict_recipients(recipients, opts[:user])
445 |> restrict_filtered(opts)
446 |> where(
447 [activity],
448 fragment(
449 "?->>'type' = ? and ?->>'context' = ?",
450 activity.data,
451 "Create",
452 activity.data,
453 ^context
454 )
455 )
456 |> exclude_poll_votes(opts)
457 |> exclude_id(opts)
458 |> order_by([activity], desc: activity.id)
459 end
460
461 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
462 def fetch_activities_for_context(context, opts \\ %{}) do
463 context
464 |> fetch_activities_for_context_query(opts)
465 |> Repo.all()
466 end
467
468 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
469 FlakeId.Ecto.CompatType.t() | nil
470 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
471 context
472 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
473 |> restrict_visibility(%{visibility: "direct"})
474 |> limit(1)
475 |> select([a], a.id)
476 |> Repo.one()
477 end
478
479 defp fetch_paginated_optimized(query, opts, pagination) do
480 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
481 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
482 opts = Map.put(opts, :skip_extra_order, true)
483
484 Pagination.fetch_paginated(query, opts, pagination)
485 end
486
487 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
488 list_memberships = Pleroma.List.memberships(opts[:user])
489
490 fetch_activities_query(recipients ++ list_memberships, opts)
491 |> fetch_paginated_optimized(opts, pagination)
492 |> Enum.reverse()
493 |> maybe_update_cc(list_memberships, opts[:user])
494 end
495
496 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
497 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
498 opts = Map.delete(opts, :user)
499
500 [Constants.as_public()]
501 |> fetch_activities_query(opts)
502 |> restrict_unlisted(opts)
503 |> fetch_paginated_optimized(opts, pagination)
504 end
505
506 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
507 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
508 opts
509 |> Map.put(:restrict_unlisted, true)
510 |> fetch_public_or_unlisted_activities(pagination)
511 end
512
513 @valid_visibilities ~w[direct unlisted public private]
514
515 defp restrict_visibility(query, %{visibility: visibility})
516 when is_list(visibility) do
517 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
518 from(
519 a in query,
520 where:
521 fragment(
522 "activity_visibility(?, ?, ?) = ANY (?)",
523 a.actor,
524 a.recipients,
525 a.data,
526 ^visibility
527 )
528 )
529 else
530 Logger.error("Could not restrict visibility to #{visibility}")
531 end
532 end
533
534 defp restrict_visibility(query, %{visibility: visibility})
535 when visibility in @valid_visibilities do
536 from(
537 a in query,
538 where:
539 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
540 )
541 end
542
543 defp restrict_visibility(_query, %{visibility: visibility})
544 when visibility not in @valid_visibilities do
545 Logger.error("Could not restrict visibility to #{visibility}")
546 end
547
548 defp restrict_visibility(query, _visibility), do: query
549
550 defp exclude_visibility(query, %{exclude_visibilities: visibility})
551 when is_list(visibility) do
552 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
553 from(
554 a in query,
555 where:
556 not fragment(
557 "activity_visibility(?, ?, ?) = ANY (?)",
558 a.actor,
559 a.recipients,
560 a.data,
561 ^visibility
562 )
563 )
564 else
565 Logger.error("Could not exclude visibility to #{visibility}")
566 query
567 end
568 end
569
570 defp exclude_visibility(query, %{exclude_visibilities: visibility})
571 when visibility in @valid_visibilities do
572 from(
573 a in query,
574 where:
575 not fragment(
576 "activity_visibility(?, ?, ?) = ?",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583 end
584
585 defp exclude_visibility(query, %{exclude_visibilities: visibility})
586 when visibility not in [nil | @valid_visibilities] do
587 Logger.error("Could not exclude visibility to #{visibility}")
588 query
589 end
590
591 defp exclude_visibility(query, _visibility), do: query
592
593 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
594 do: query
595
596 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
597 do: query
598
599 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
600 from(
601 a in query,
602 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
603 )
604 end
605
606 defp restrict_thread_visibility(query, _, _), do: query
607
608 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
609 params =
610 params
611 |> Map.put(:user, reading_user)
612 |> Map.put(:actor_id, user.ap_id)
613
614 %{
615 godmode: params[:godmode],
616 reading_user: reading_user
617 }
618 |> user_activities_recipients()
619 |> fetch_activities(params)
620 |> Enum.reverse()
621 end
622
623 def fetch_user_activities(user, reading_user, params \\ %{})
624
625 def fetch_user_activities(user, reading_user, %{total: true} = params) do
626 result = fetch_activities_for_user(user, reading_user, params)
627
628 Keyword.put(result, :items, Enum.reverse(result[:items]))
629 end
630
631 def fetch_user_activities(user, reading_user, params) do
632 user
633 |> fetch_activities_for_user(reading_user, params)
634 |> Enum.reverse()
635 end
636
637 defp fetch_activities_for_user(user, reading_user, params) do
638 params =
639 params
640 |> Map.put(:type, ["Create", "Announce"])
641 |> Map.put(:user, reading_user)
642 |> Map.put(:actor_id, user.ap_id)
643 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
644
645 params =
646 if User.blocks?(reading_user, user) do
647 params
648 else
649 params
650 |> Map.put(:blocking_user, reading_user)
651 |> Map.put(:muting_user, reading_user)
652 end
653
654 pagination_type = Map.get(params, :pagination_type) || :keyset
655
656 %{
657 godmode: params[:godmode],
658 reading_user: reading_user
659 }
660 |> user_activities_recipients()
661 |> fetch_activities(params, pagination_type)
662 end
663
664 def fetch_statuses(reading_user, %{total: true} = params) do
665 result = fetch_activities_for_reading_user(reading_user, params)
666 Keyword.put(result, :items, Enum.reverse(result[:items]))
667 end
668
669 def fetch_statuses(reading_user, params) do
670 reading_user
671 |> fetch_activities_for_reading_user(params)
672 |> Enum.reverse()
673 end
674
675 defp fetch_activities_for_reading_user(reading_user, params) do
676 params = Map.put(params, :type, ["Create", "Announce"])
677
678 %{
679 godmode: params[:godmode],
680 reading_user: reading_user
681 }
682 |> user_activities_recipients()
683 |> fetch_activities(params, :offset)
684 end
685
686 defp user_activities_recipients(%{godmode: true}), do: []
687
688 defp user_activities_recipients(%{reading_user: reading_user}) do
689 if reading_user do
690 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
691 else
692 [Constants.as_public()]
693 end
694 end
695
696 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
697 raise "Can't use the child object without preloading!"
698 end
699
700 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
701 from(
702 [activity, object] in query,
703 where:
704 fragment(
705 "?->>'type' != ? or ?->>'actor' != ?",
706 activity.data,
707 "Announce",
708 object.data,
709 ^actor
710 )
711 )
712 end
713
714 defp restrict_announce_object_actor(query, _), do: query
715
716 defp restrict_since(query, %{since_id: ""}), do: query
717
718 defp restrict_since(query, %{since_id: since_id}) do
719 from(activity in query, where: activity.id > ^since_id)
720 end
721
722 defp restrict_since(query, _), do: query
723
724 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
725 raise_on_missing_preload()
726 end
727
728 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
729 from(
730 [_activity, object] in query,
731 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
732 )
733 end
734
735 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
736 restrict_embedded_tag_any(query, %{tag: tag})
737 end
738
739 defp restrict_embedded_tag_all(query, _), do: query
740
741 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
742 raise_on_missing_preload()
743 end
744
745 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
746 from(
747 [_activity, object] in query,
748 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
749 )
750 end
751
752 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
753 restrict_embedded_tag_any(query, %{tag: [tag]})
754 end
755
756 defp restrict_embedded_tag_any(query, _), do: query
757
758 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
759 raise_on_missing_preload()
760 end
761
762 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
763 from(
764 [_activity, object] in query,
765 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
766 )
767 end
768
769 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
770 when is_binary(tag_reject) do
771 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
772 end
773
774 defp restrict_embedded_tag_reject_any(query, _), do: query
775
776 defp object_ids_query_for_tags(tags) do
777 from(hto in "hashtags_objects")
778 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
779 |> where([hto, ht], ht.name in ^tags)
780 |> select([hto], hto.object_id)
781 |> distinct([hto], true)
782 end
783
784 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
785 raise_on_missing_preload()
786 end
787
788 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
789 restrict_hashtag_any(query, %{tag: single_tag})
790 end
791
792 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
793 from(
794 [_activity, object] in query,
795 where:
796 fragment(
797 """
798 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
799 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
800 AND hashtags_objects.object_id = ?) @> ?
801 """,
802 ^tags,
803 object.id,
804 ^tags
805 )
806 )
807 end
808
809 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
810 restrict_hashtag_all(query, %{tag_all: [tag]})
811 end
812
813 defp restrict_hashtag_all(query, _), do: query
814
815 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
816 raise_on_missing_preload()
817 end
818
819 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
820 hashtag_ids =
821 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
822 |> Repo.all()
823
824 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
825 from(
826 [_activity, object] in query,
827 join: hto in "hashtags_objects",
828 on: hto.object_id == object.id,
829 where: hto.hashtag_id in ^hashtag_ids,
830 distinct: [desc: object.id],
831 order_by: [desc: object.id]
832 )
833 end
834
835 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
836 restrict_hashtag_any(query, %{tag: [tag]})
837 end
838
839 defp restrict_hashtag_any(query, _), do: query
840
841 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
842 raise_on_missing_preload()
843 end
844
845 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
846 from(
847 [_activity, object] in query,
848 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
849 )
850 end
851
852 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
853 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
854 end
855
856 defp restrict_hashtag_reject_any(query, _), do: query
857
858 defp raise_on_missing_preload do
859 raise "Can't use the child object without preloading!"
860 end
861
862 defp restrict_recipients(query, [], _user), do: query
863
864 defp restrict_recipients(query, recipients, nil) do
865 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
866 end
867
868 defp restrict_recipients(query, recipients, user) do
869 from(
870 activity in query,
871 where: fragment("? && ?", ^recipients, activity.recipients),
872 or_where: activity.actor == ^user.ap_id
873 )
874 end
875
876 defp restrict_local(query, %{local_only: true}) do
877 from(activity in query, where: activity.local == true)
878 end
879
880 defp restrict_local(query, _), do: query
881
882 defp restrict_remote(query, %{remote: true}) do
883 from(activity in query, where: activity.local == false)
884 end
885
886 defp restrict_remote(query, _), do: query
887
888 defp restrict_actor(query, %{actor_id: actor_id}) do
889 from(activity in query, where: activity.actor == ^actor_id)
890 end
891
892 defp restrict_actor(query, _), do: query
893
894 defp restrict_type(query, %{type: type}) when is_binary(type) do
895 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
896 end
897
898 defp restrict_type(query, %{type: type}) do
899 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
900 end
901
902 defp restrict_type(query, _), do: query
903
904 defp restrict_state(query, %{state: state}) do
905 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
906 end
907
908 defp restrict_state(query, _), do: query
909
910 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
911 from(
912 [_activity, object] in query,
913 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
914 )
915 end
916
917 defp restrict_favorited_by(query, _), do: query
918
919 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
920 raise "Can't use the child object without preloading!"
921 end
922
923 defp restrict_media(query, %{only_media: true}) do
924 from(
925 [activity, object] in query,
926 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
927 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
928 )
929 end
930
931 defp restrict_media(query, _), do: query
932
933 defp restrict_replies(query, %{exclude_replies: true}) do
934 from(
935 [_activity, object] in query,
936 where: fragment("?->>'inReplyTo' is null", object.data)
937 )
938 end
939
940 defp restrict_replies(query, %{
941 reply_filtering_user: %User{} = user,
942 reply_visibility: "self"
943 }) do
944 from(
945 [activity, object] in query,
946 where:
947 fragment(
948 "?->>'inReplyTo' is null OR ? = ANY(?)",
949 object.data,
950 ^user.ap_id,
951 activity.recipients
952 )
953 )
954 end
955
956 defp restrict_replies(query, %{
957 reply_filtering_user: %User{} = user,
958 reply_visibility: "following"
959 }) do
960 from(
961 [activity, object] in query,
962 where:
963 fragment(
964 """
965 ?->>'type' != 'Create' -- This isn't a Create
966 OR ?->>'inReplyTo' is null -- this isn't a reply
967 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
968 -- unless they are the author (because authors
969 -- are also part of the recipients). This leads
970 -- to a bug that self-replies by friends won't
971 -- show up.
972 OR ? = ? -- The actor is us
973 """,
974 activity.data,
975 object.data,
976 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
977 activity.recipients,
978 activity.actor,
979 activity.actor,
980 ^user.ap_id
981 )
982 )
983 end
984
985 defp restrict_replies(query, _), do: query
986
987 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
988 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
989 end
990
991 defp restrict_reblogs(query, _), do: query
992
993 defp restrict_muted(query, %{with_muted: true}), do: query
994
995 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
996 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
997
998 query =
999 from([activity] in query,
1000 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1001 where:
1002 fragment(
1003 "not (?->'to' \\?| ?) or ? = ?",
1004 activity.data,
1005 ^mutes,
1006 activity.actor,
1007 ^user.ap_id
1008 )
1009 )
1010
1011 unless opts[:skip_preload] do
1012 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1013 else
1014 query
1015 end
1016 end
1017
1018 defp restrict_muted(query, _), do: query
1019
1020 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1021 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1022 domain_blocks = user.domain_blocks || []
1023
1024 following_ap_ids = User.get_friends_ap_ids(user)
1025
1026 query =
1027 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1028
1029 from(
1030 [activity, object: o] in query,
1031 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1032 where:
1033 fragment(
1034 "((not (? && ?)) or ? = ?)",
1035 activity.recipients,
1036 ^blocked_ap_ids,
1037 activity.actor,
1038 ^user.ap_id
1039 ),
1040 where:
1041 fragment(
1042 "recipients_contain_blocked_domains(?, ?) = false",
1043 activity.recipients,
1044 ^domain_blocks
1045 ),
1046 where:
1047 fragment(
1048 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1049 activity.data,
1050 activity.data,
1051 ^blocked_ap_ids
1052 ),
1053 where:
1054 fragment(
1055 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1056 activity.actor,
1057 ^domain_blocks,
1058 activity.actor,
1059 ^following_ap_ids
1060 ),
1061 where:
1062 fragment(
1063 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1064 o.data,
1065 ^domain_blocks,
1066 o.data,
1067 ^following_ap_ids
1068 )
1069 )
1070 end
1071
1072 defp restrict_blocked(query, _), do: query
1073
1074 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1075 from(
1076 activity in query,
1077 where:
1078 fragment(
1079 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1080 activity.data,
1081 ^[Constants.as_public()]
1082 )
1083 )
1084 end
1085
1086 defp restrict_unlisted(query, _), do: query
1087
1088 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1089 from(
1090 [activity, object: o] in query,
1091 where:
1092 fragment(
1093 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1094 activity.data,
1095 activity.data,
1096 activity.data,
1097 ^ids
1098 )
1099 )
1100 end
1101
1102 defp restrict_pinned(query, _), do: query
1103
1104 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1105 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1106
1107 from(
1108 activity in query,
1109 where:
1110 fragment(
1111 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1112 activity.data,
1113 activity.actor,
1114 ^muted_reblogs
1115 )
1116 )
1117 end
1118
1119 defp restrict_muted_reblogs(query, _), do: query
1120
1121 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1122 from(
1123 activity in query,
1124 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1125 )
1126 end
1127
1128 defp restrict_instance(query, _), do: query
1129
1130 defp restrict_filtered(query, %{user: %User{} = user}) do
1131 case Filter.compose_regex(user) do
1132 nil ->
1133 query
1134
1135 regex ->
1136 from([activity, object] in query,
1137 where:
1138 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1139 activity.actor == ^user.ap_id
1140 )
1141 end
1142 end
1143
1144 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1145 restrict_filtered(query, %{user: user})
1146 end
1147
1148 defp restrict_filtered(query, _), do: query
1149
1150 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1151
1152 defp exclude_poll_votes(query, _) do
1153 if has_named_binding?(query, :object) do
1154 from([activity, object: o] in query,
1155 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1156 )
1157 else
1158 query
1159 end
1160 end
1161
1162 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1163
1164 defp exclude_chat_messages(query, _) do
1165 if has_named_binding?(query, :object) do
1166 from([activity, object: o] in query,
1167 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1168 )
1169 else
1170 query
1171 end
1172 end
1173
1174 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1175
1176 defp exclude_invisible_actors(query, _opts) do
1177 invisible_ap_ids =
1178 User.Query.build(%{invisible: true, select: [:ap_id]})
1179 |> Repo.all()
1180 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1181
1182 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1183 end
1184
1185 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1186 from(activity in query, where: activity.id != ^id)
1187 end
1188
1189 defp exclude_id(query, _), do: query
1190
1191 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1192
1193 defp maybe_preload_objects(query, _) do
1194 query
1195 |> Activity.with_preloaded_object()
1196 end
1197
1198 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1199
1200 defp maybe_preload_bookmarks(query, opts) do
1201 query
1202 |> Activity.with_preloaded_bookmark(opts[:user])
1203 end
1204
1205 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1206 query
1207 |> Activity.with_preloaded_report_notes()
1208 end
1209
1210 defp maybe_preload_report_notes(query, _), do: query
1211
1212 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1213
1214 defp maybe_set_thread_muted_field(query, opts) do
1215 query
1216 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1217 end
1218
1219 defp maybe_order(query, %{order: :desc}) do
1220 query
1221 |> order_by(desc: :id)
1222 end
1223
1224 defp maybe_order(query, %{order: :asc}) do
1225 query
1226 |> order_by(asc: :id)
1227 end
1228
1229 defp maybe_order(query, _), do: query
1230
1231 defp normalize_fetch_activities_query_opts(opts) do
1232 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1233 case opts[key] do
1234 value when is_bitstring(value) ->
1235 Map.put(opts, key, Hashtag.normalize_name(value))
1236
1237 value when is_list(value) ->
1238 normalized_value =
1239 value
1240 |> Enum.map(&Hashtag.normalize_name/1)
1241 |> Enum.uniq()
1242
1243 Map.put(opts, key, normalized_value)
1244
1245 _ ->
1246 opts
1247 end
1248 end)
1249 end
1250
1251 defp fetch_activities_query_ap_ids_ops(opts) do
1252 source_user = opts[:muting_user]
1253 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1254
1255 ap_id_relationships =
1256 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1257 [:block | ap_id_relationships]
1258 else
1259 ap_id_relationships
1260 end
1261
1262 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1263
1264 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1265 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1266
1267 restrict_muted_reblogs_opts =
1268 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1269
1270 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1271 end
1272
1273 def fetch_activities_query(recipients, opts \\ %{}) do
1274 opts = normalize_fetch_activities_query_opts(opts)
1275
1276 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1277 fetch_activities_query_ap_ids_ops(opts)
1278
1279 config = %{
1280 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1281 }
1282
1283 query =
1284 Activity
1285 |> maybe_preload_objects(opts)
1286 |> maybe_preload_bookmarks(opts)
1287 |> maybe_preload_report_notes(opts)
1288 |> maybe_set_thread_muted_field(opts)
1289 |> maybe_order(opts)
1290 |> restrict_recipients(recipients, opts[:user])
1291 |> restrict_replies(opts)
1292 |> restrict_since(opts)
1293 |> restrict_local(opts)
1294 |> restrict_remote(opts)
1295 |> restrict_actor(opts)
1296 |> restrict_type(opts)
1297 |> restrict_state(opts)
1298 |> restrict_favorited_by(opts)
1299 |> restrict_blocked(restrict_blocked_opts)
1300 |> restrict_muted(restrict_muted_opts)
1301 |> restrict_filtered(opts)
1302 |> restrict_media(opts)
1303 |> restrict_visibility(opts)
1304 |> restrict_thread_visibility(opts, config)
1305 |> restrict_reblogs(opts)
1306 |> restrict_pinned(opts)
1307 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1308 |> restrict_instance(opts)
1309 |> restrict_announce_object_actor(opts)
1310 |> restrict_filtered(opts)
1311 |> Activity.restrict_deactivated_users()
1312 |> exclude_poll_votes(opts)
1313 |> exclude_chat_messages(opts)
1314 |> exclude_invisible_actors(opts)
1315 |> exclude_visibility(opts)
1316
1317 if Config.feature_enabled?(:improved_hashtag_timeline) do
1318 query
1319 |> restrict_hashtag_any(opts)
1320 |> restrict_hashtag_all(opts)
1321 |> restrict_hashtag_reject_any(opts)
1322 else
1323 query
1324 |> restrict_embedded_tag_any(opts)
1325 |> restrict_embedded_tag_all(opts)
1326 |> restrict_embedded_tag_reject_any(opts)
1327 end
1328 end
1329
1330 @doc """
1331 Fetch favorites activities of user with order by sort adds to favorites
1332 """
1333 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1334 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1335 user.ap_id
1336 |> Activity.Queries.by_actor()
1337 |> Activity.Queries.by_type("Like")
1338 |> Activity.with_joined_object()
1339 |> Object.with_joined_activity()
1340 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1341 |> order_by([like, _, _], desc_nulls_last: like.id)
1342 |> Pagination.fetch_paginated(
1343 Map.merge(params, %{skip_order: true}),
1344 pagination
1345 )
1346 end
1347
1348 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1349 Enum.map(activities, fn
1350 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1351 if Enum.any?(bcc, &(&1 in list_memberships)) do
1352 update_in(activity.data["cc"], &[user_ap_id | &1])
1353 else
1354 activity
1355 end
1356
1357 activity ->
1358 activity
1359 end)
1360 end
1361
1362 defp maybe_update_cc(activities, _, _), do: activities
1363
1364 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1365 from(activity in query,
1366 where:
1367 fragment("? && ?", activity.recipients, ^recipients) or
1368 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1369 ^Constants.as_public() in activity.recipients)
1370 )
1371 end
1372
1373 def fetch_activities_bounded(
1374 recipients,
1375 recipients_with_public,
1376 opts \\ %{},
1377 pagination \\ :keyset
1378 ) do
1379 fetch_activities_query([], opts)
1380 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1381 |> Pagination.fetch_paginated(opts, pagination)
1382 |> Enum.reverse()
1383 end
1384
1385 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1386 def upload(file, opts \\ []) do
1387 with {:ok, data} <- Upload.store(file, opts) do
1388 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1389
1390 Repo.insert(%Object{data: obj_data})
1391 end
1392 end
1393
1394 @spec get_actor_url(any()) :: binary() | nil
1395 defp get_actor_url(url) when is_binary(url), do: url
1396 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1397
1398 defp get_actor_url(url) when is_list(url) do
1399 url
1400 |> List.first()
1401 |> get_actor_url()
1402 end
1403
1404 defp get_actor_url(_url), do: nil
1405
1406 defp normalize_image(%{"url" => url}) do
1407 %{
1408 "type" => "Image",
1409 "url" => [%{"href" => url}]
1410 }
1411 end
1412
1413 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1414 defp normalize_image(_), do: nil
1415
1416 defp object_to_user_data(data) do
1417 fields =
1418 data
1419 |> Map.get("attachment", [])
1420 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1421 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1422
1423 emojis =
1424 data
1425 |> Map.get("tag", [])
1426 |> Enum.filter(fn
1427 %{"type" => "Emoji"} -> true
1428 _ -> false
1429 end)
1430 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1431 {String.trim(name, ":"), url}
1432 end)
1433
1434 is_locked = data["manuallyApprovesFollowers"] || false
1435 capabilities = data["capabilities"] || %{}
1436 accepts_chat_messages = capabilities["acceptsChatMessages"]
1437 data = Transmogrifier.maybe_fix_user_object(data)
1438 is_discoverable = data["discoverable"] || false
1439 invisible = data["invisible"] || false
1440 actor_type = data["type"] || "Person"
1441
1442 featured_address = data["featured"]
1443 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1444
1445 public_key =
1446 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1447 data["publicKey"]["publicKeyPem"]
1448 else
1449 nil
1450 end
1451
1452 shared_inbox =
1453 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1454 data["endpoints"]["sharedInbox"]
1455 else
1456 nil
1457 end
1458
1459 user_data = %{
1460 ap_id: data["id"],
1461 uri: get_actor_url(data["url"]),
1462 ap_enabled: true,
1463 banner: normalize_image(data["image"]),
1464 fields: fields,
1465 emoji: emojis,
1466 is_locked: is_locked,
1467 is_discoverable: is_discoverable,
1468 invisible: invisible,
1469 avatar: normalize_image(data["icon"]),
1470 name: data["name"],
1471 follower_address: data["followers"],
1472 following_address: data["following"],
1473 featured_address: featured_address,
1474 bio: data["summary"] || "",
1475 actor_type: actor_type,
1476 also_known_as: Map.get(data, "alsoKnownAs", []),
1477 public_key: public_key,
1478 inbox: data["inbox"],
1479 shared_inbox: shared_inbox,
1480 accepts_chat_messages: accepts_chat_messages,
1481 pinned_objects: pinned_objects
1482 }
1483
1484 # nickname can be nil because of virtual actors
1485 if data["preferredUsername"] do
1486 Map.put(
1487 user_data,
1488 :nickname,
1489 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1490 )
1491 else
1492 Map.put(user_data, :nickname, nil)
1493 end
1494 end
1495
1496 def fetch_follow_information_for_user(user) do
1497 with {:ok, following_data} <-
1498 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1499 {:ok, hide_follows} <- collection_private(following_data),
1500 {:ok, followers_data} <-
1501 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1502 {:ok, hide_followers} <- collection_private(followers_data) do
1503 {:ok,
1504 %{
1505 hide_follows: hide_follows,
1506 follower_count: normalize_counter(followers_data["totalItems"]),
1507 following_count: normalize_counter(following_data["totalItems"]),
1508 hide_followers: hide_followers
1509 }}
1510 else
1511 {:error, _} = e -> e
1512 e -> {:error, e}
1513 end
1514 end
1515
1516 defp normalize_counter(counter) when is_integer(counter), do: counter
1517 defp normalize_counter(_), do: 0
1518
1519 def maybe_update_follow_information(user_data) do
1520 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1521 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1522 {_, true} <-
1523 {:collections_available,
1524 !!(user_data[:following_address] && user_data[:follower_address])},
1525 {:ok, info} <-
1526 fetch_follow_information_for_user(user_data) do
1527 info = Map.merge(user_data[:info] || %{}, info)
1528
1529 user_data
1530 |> Map.put(:info, info)
1531 else
1532 {:user_type_check, false} ->
1533 user_data
1534
1535 {:collections_available, false} ->
1536 user_data
1537
1538 {:enabled, false} ->
1539 user_data
1540
1541 e ->
1542 Logger.error(
1543 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1544 )
1545
1546 user_data
1547 end
1548 end
1549
1550 defp collection_private(%{"first" => %{"type" => type}})
1551 when type in ["CollectionPage", "OrderedCollectionPage"],
1552 do: {:ok, false}
1553
1554 defp collection_private(%{"first" => first}) do
1555 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1556 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1557 {:ok, false}
1558 else
1559 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1560 {:error, _} = e -> e
1561 e -> {:error, e}
1562 end
1563 end
1564
1565 defp collection_private(_data), do: {:ok, true}
1566
1567 def user_data_from_user_object(data) do
1568 with {:ok, data} <- MRF.filter(data) do
1569 {:ok, object_to_user_data(data)}
1570 else
1571 e -> {:error, e}
1572 end
1573 end
1574
1575 def fetch_and_prepare_user_from_ap_id(ap_id) do
1576 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1577 {:ok, data} <- user_data_from_user_object(data) do
1578 {:ok, maybe_update_follow_information(data)}
1579 else
1580 # If this has been deleted, only log a debug and not an error
1581 {:error, "Object has been deleted" = e} ->
1582 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1583 {:error, e}
1584
1585 {:error, {:reject, reason} = e} ->
1586 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1587 {:error, e}
1588
1589 {:error, e} ->
1590 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1591 {:error, e}
1592 end
1593 end
1594
1595 def maybe_handle_clashing_nickname(data) do
1596 with nickname when is_binary(nickname) <- data[:nickname],
1597 %User{} = old_user <- User.get_by_nickname(nickname),
1598 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1599 Logger.info(
1600 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1601 data[:ap_id]
1602 }, renaming."
1603 )
1604
1605 old_user
1606 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1607 |> User.update_and_set_cache()
1608 else
1609 {:ap_id_comparison, true} ->
1610 Logger.info(
1611 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1612 )
1613
1614 _ ->
1615 nil
1616 end
1617 end
1618
1619 def pin_data_from_featured_collection(%{
1620 "type" => type,
1621 "orderedItems" => objects
1622 })
1623 when type in ["OrderedCollection", "Collection"] do
1624 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1625 end
1626
1627 def fetch_and_prepare_featured_from_ap_id(nil) do
1628 {:ok, %{}}
1629 end
1630
1631 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1632 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1633 {:ok, pin_data_from_featured_collection(data)}
1634 else
1635 e ->
1636 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1637 {:ok, %{}}
1638 end
1639 end
1640
1641 def pinned_fetch_task(nil), do: nil
1642
1643 def pinned_fetch_task(%{pinned_objects: pins}) do
1644 if Enum.all?(pins, fn {ap_id, _} ->
1645 Object.get_cached_by_ap_id(ap_id) ||
1646 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1647 end) do
1648 :ok
1649 else
1650 :error
1651 end
1652 end
1653
1654 def make_user_from_ap_id(ap_id) do
1655 user = User.get_cached_by_ap_id(ap_id)
1656
1657 if user && !User.ap_enabled?(user) do
1658 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1659 else
1660 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1661 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1662
1663 if user do
1664 user
1665 |> User.remote_user_changeset(data)
1666 |> User.update_and_set_cache()
1667 else
1668 maybe_handle_clashing_nickname(data)
1669
1670 data
1671 |> User.remote_user_changeset()
1672 |> Repo.insert()
1673 |> User.set_cache()
1674 end
1675 end
1676 end
1677 end
1678
1679 def make_user_from_nickname(nickname) do
1680 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1681 make_user_from_ap_id(ap_id)
1682 else
1683 _e -> {:error, "No AP id in WebFinger"}
1684 end
1685 end
1686
1687 # filter out broken threads
1688 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1689 entire_thread_visible_for_user?(activity, user)
1690 end
1691
1692 # do post-processing on a specific activity
1693 def contain_activity(%Activity{} = activity, %User{} = user) do
1694 contain_broken_threads(activity, user)
1695 end
1696
1697 def fetch_direct_messages_query do
1698 Activity
1699 |> restrict_type(%{type: "Create"})
1700 |> restrict_visibility(%{visibility: "direct"})
1701 |> order_by([activity], asc: activity.id)
1702 end
1703 end