8324ca22c80bcedace3bd1a2d1f8985d941c0945
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
29
30 import Ecto.Query
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
33
34 require Logger
35 require Pleroma.Constants
36
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
54 {recipients, to, cc}
55 end
56
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
63 _ -> false
64 end
65 end
66
67 defp check_actor_can_insert(_), do: true
68
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
72 end
73
74 defp check_remote_limit(_), do: true
75
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78 end
79
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82 end
83
84 defp increase_replies_count_if_reply(%{
85 "object" => %{"inReplyTo" => reply_ap_id} = object,
86 "type" => "Create"
87 }) do
88 if is_public?(object) do
89 Object.increase_replies_count(reply_ap_id)
90 end
91 end
92
93 defp increase_replies_count_if_reply(_create_data), do: :noop
94
95 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
96 @impl true
97 def persist(%{"type" => type} = object, meta) when type in @object_types do
98 with {:ok, object} <- Object.create(object) do
99 {:ok, object, meta}
100 end
101 end
102
103 @impl true
104 def persist(object, meta) do
105 with local <- Keyword.fetch!(meta, :local),
106 {recipients, _, _} <- get_recipients(object),
107 {:ok, activity} <-
108 Repo.insert(%Activity{
109 data: object,
110 local: local,
111 recipients: recipients,
112 actor: object["actor"]
113 }),
114 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
115 {:ok, _} <- maybe_create_activity_expiration(activity) do
116 {:ok, activity, meta}
117 end
118 end
119
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
125 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map),
131 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
132 # Splice in the child object if we have one.
133 activity = Maps.put_if_present(activity, :object, object)
134
135 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
136 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
137 end)
138
139 {:ok, activity}
140 else
141 %Activity{} = activity ->
142 {:ok, activity}
143
144 {:actor_check, _} ->
145 {:error, false}
146
147 {:containment, _} = error ->
148 error
149
150 {:error, _} = error ->
151 error
152
153 {:fake, true, map, recipients} ->
154 activity = %Activity{
155 data: map,
156 local: local,
157 actor: map["actor"],
158 recipients: recipients,
159 id: "pleroma:fakeid"
160 }
161
162 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
163 {:ok, activity}
164
165 {:remote_limit_pass, _} ->
166 {:error, :remote_limit}
167
168 {:reject, _} = e ->
169 {:error, e}
170 end
171 end
172
173 defp insert_activity_with_expiration(data, local, recipients) do
174 struct = %Activity{
175 data: data,
176 local: local,
177 actor: data["actor"],
178 recipients: recipients
179 }
180
181 with {:ok, activity} <- Repo.insert(struct) do
182 maybe_create_activity_expiration(activity)
183 end
184 end
185
186 def notify_and_stream(activity) do
187 Notification.create_notifications(activity)
188
189 conversation = create_or_bump_conversation(activity, activity.actor)
190 participations = get_participations(conversation)
191 stream_out(activity)
192 stream_out_participations(participations)
193 end
194
195 defp maybe_create_activity_expiration(
196 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
197 ) do
198 with {:ok, _job} <-
199 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
200 activity_id: activity.id,
201 expires_at: expires_at
202 }) do
203 {:ok, activity}
204 end
205 end
206
207 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
208
209 defp create_or_bump_conversation(activity, actor) do
210 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
211 %User{} = user <- User.get_cached_by_ap_id(actor) do
212 Participation.mark_as_read(user, conversation)
213 {:ok, conversation}
214 end
215 end
216
217 defp get_participations({:ok, conversation}) do
218 conversation
219 |> Repo.preload(:participations, force: true)
220 |> Map.get(:participations)
221 end
222
223 defp get_participations(_), do: []
224
225 def stream_out_participations(participations) do
226 participations =
227 participations
228 |> Repo.preload(:user)
229
230 Streamer.stream("participation", participations)
231 end
232
233 @impl true
234 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
235 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
236 conversation = Repo.preload(conversation, :participations)
237
238 last_activity_id =
239 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
240 user: user,
241 blocking_user: user
242 })
243
244 if last_activity_id do
245 stream_out_participations(conversation.participations)
246 end
247 end
248 end
249
250 @impl true
251 def stream_out_participations(_, _), do: :noop
252
253 @impl true
254 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
255 when data_type in ["Create", "Announce", "Delete"] do
256 activity
257 |> Topics.get_activity_topics()
258 |> Streamer.stream(activity)
259 end
260
261 @impl true
262 def stream_out(_activity) do
263 :noop
264 end
265
266 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
267 def create(params, fake \\ false) do
268 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
269 result
270 end
271 end
272
273 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
274 additional = params[:additional] || %{}
275 # only accept false as false value
276 local = !(params[:local] == false)
277 published = params[:published]
278 quick_insert? = Config.get([:env]) == :benchmark
279
280 create_data =
281 make_create_data(
282 %{to: to, actor: actor, published: published, context: context, object: object},
283 additional
284 )
285
286 with {:ok, activity} <- insert(create_data, local, fake),
287 {:fake, false, activity} <- {:fake, fake, activity},
288 _ <- increase_replies_count_if_reply(create_data),
289 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
290 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
291 _ <- notify_and_stream(activity),
292 :ok <- maybe_schedule_poll_notifications(activity),
293 :ok <- maybe_federate(activity) do
294 {:ok, activity}
295 else
296 {:quick_insert, true, activity} ->
297 {:ok, activity}
298
299 {:fake, true, activity} ->
300 {:ok, activity}
301
302 {:error, message} ->
303 Repo.rollback(message)
304 end
305 end
306
307 defp maybe_schedule_poll_notifications(activity) do
308 PollWorker.schedule_poll_end(activity)
309 :ok
310 end
311
312 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
313 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
314 additional = params[:additional] || %{}
315 # only accept false as false value
316 local = !(params[:local] == false)
317 published = params[:published]
318
319 listen_data =
320 make_listen_data(
321 %{to: to, actor: actor, published: published, context: context, object: object},
322 additional
323 )
324
325 with {:ok, activity} <- insert(listen_data, local),
326 _ <- notify_and_stream(activity),
327 :ok <- maybe_federate(activity) do
328 {:ok, activity}
329 end
330 end
331
332 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
333 {:ok, Activity.t()} | nil | {:error, any()}
334 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
335 with {:ok, result} <-
336 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
337 result
338 end
339 end
340
341 defp do_unfollow(follower, followed, activity_id, local) do
342 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
343 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
344 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
345 {:ok, activity} <- insert(unfollow_data, local),
346 _ <- notify_and_stream(activity),
347 :ok <- maybe_federate(activity) do
348 {:ok, activity}
349 else
350 nil -> nil
351 {:error, error} -> Repo.rollback(error)
352 end
353 end
354
355 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
356 def flag(params) do
357 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
358 result
359 end
360 end
361
362 defp do_flag(
363 %{
364 actor: actor,
365 context: _context,
366 account: account,
367 statuses: statuses,
368 content: content
369 } = params
370 ) do
371 # only accept false as false value
372 local = !(params[:local] == false)
373 forward = !(params[:forward] == false)
374
375 additional = params[:additional] || %{}
376
377 additional =
378 if forward do
379 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
380 else
381 Map.merge(additional, %{"to" => [], "cc" => []})
382 end
383
384 with flag_data <- make_flag_data(params, additional),
385 {:ok, activity} <- insert(flag_data, local),
386 {:ok, stripped_activity} <- strip_report_status_data(activity),
387 _ <- notify_and_stream(activity),
388 :ok <-
389 maybe_federate(stripped_activity) do
390 User.all_superusers()
391 |> Enum.filter(fn user -> user.ap_id != actor end)
392 |> Enum.filter(fn user -> not is_nil(user.email) end)
393 |> Enum.each(fn superuser ->
394 superuser
395 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
396 |> Pleroma.Emails.Mailer.deliver_async()
397 end)
398
399 {:ok, activity}
400 else
401 {:error, error} -> Repo.rollback(error)
402 end
403 end
404
405 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
406 def move(%User{} = origin, %User{} = target, local \\ true) do
407 params = %{
408 "type" => "Move",
409 "actor" => origin.ap_id,
410 "object" => origin.ap_id,
411 "target" => target.ap_id
412 }
413
414 with true <- origin.ap_id in target.also_known_as,
415 {:ok, activity} <- insert(params, local),
416 _ <- notify_and_stream(activity) do
417 maybe_federate(activity)
418
419 BackgroundWorker.enqueue("move_following", %{
420 "origin_id" => origin.id,
421 "target_id" => target.id
422 })
423
424 {:ok, activity}
425 else
426 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
427 err -> err
428 end
429 end
430
431 def fetch_activities_for_context_query(context, opts) do
432 public = [Constants.as_public()]
433
434 recipients =
435 if opts[:user],
436 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
437 else: public
438
439 from(activity in Activity)
440 |> maybe_preload_objects(opts)
441 |> maybe_preload_bookmarks(opts)
442 |> maybe_set_thread_muted_field(opts)
443 |> restrict_blocked(opts)
444 |> restrict_blockers_visibility(opts)
445 |> restrict_recipients(recipients, opts[:user])
446 |> restrict_filtered(opts)
447 |> where(
448 [activity],
449 fragment(
450 "?->>'type' = ? and ?->>'context' = ?",
451 activity.data,
452 "Create",
453 activity.data,
454 ^context
455 )
456 )
457 |> exclude_poll_votes(opts)
458 |> exclude_id(opts)
459 |> order_by([activity], desc: activity.id)
460 end
461
462 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
463 def fetch_activities_for_context(context, opts \\ %{}) do
464 context
465 |> fetch_activities_for_context_query(opts)
466 |> Repo.all()
467 end
468
469 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
470 FlakeId.Ecto.CompatType.t() | nil
471 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
472 context
473 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
474 |> restrict_visibility(%{visibility: "direct"})
475 |> limit(1)
476 |> select([a], a.id)
477 |> Repo.one()
478 end
479
480 defp fetch_paginated_optimized(query, opts, pagination) do
481 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
482 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
483 opts = Map.put(opts, :skip_extra_order, true)
484
485 Pagination.fetch_paginated(query, opts, pagination)
486 end
487
488 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
489 list_memberships = Pleroma.List.memberships(opts[:user])
490
491 fetch_activities_query(recipients ++ list_memberships, opts)
492 |> fetch_paginated_optimized(opts, pagination)
493 |> Enum.reverse()
494 |> maybe_update_cc(list_memberships, opts[:user])
495 end
496
497 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
498 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
499 opts = Map.delete(opts, :user)
500
501 [Constants.as_public()]
502 |> fetch_activities_query(opts)
503 |> restrict_unlisted(opts)
504 |> fetch_paginated_optimized(opts, pagination)
505 end
506
507 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
508 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
509 opts
510 |> Map.put(:restrict_unlisted, true)
511 |> fetch_public_or_unlisted_activities(pagination)
512 end
513
514 @valid_visibilities ~w[direct unlisted public private]
515
516 defp restrict_visibility(query, %{visibility: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
519 from(
520 a in query,
521 where:
522 fragment(
523 "activity_visibility(?, ?, ?) = ANY (?)",
524 a.actor,
525 a.recipients,
526 a.data,
527 ^visibility
528 )
529 )
530 else
531 Logger.error("Could not restrict visibility to #{visibility}")
532 end
533 end
534
535 defp restrict_visibility(query, %{visibility: visibility})
536 when visibility in @valid_visibilities do
537 from(
538 a in query,
539 where:
540 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
541 )
542 end
543
544 defp restrict_visibility(_query, %{visibility: visibility})
545 when visibility not in @valid_visibilities do
546 Logger.error("Could not restrict visibility to #{visibility}")
547 end
548
549 defp restrict_visibility(query, _visibility), do: query
550
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when is_list(visibility) do
553 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
554 from(
555 a in query,
556 where:
557 not fragment(
558 "activity_visibility(?, ?, ?) = ANY (?)",
559 a.actor,
560 a.recipients,
561 a.data,
562 ^visibility
563 )
564 )
565 else
566 Logger.error("Could not exclude visibility to #{visibility}")
567 query
568 end
569 end
570
571 defp exclude_visibility(query, %{exclude_visibilities: visibility})
572 when visibility in @valid_visibilities do
573 from(
574 a in query,
575 where:
576 not fragment(
577 "activity_visibility(?, ?, ?) = ?",
578 a.actor,
579 a.recipients,
580 a.data,
581 ^visibility
582 )
583 )
584 end
585
586 defp exclude_visibility(query, %{exclude_visibilities: visibility})
587 when visibility not in [nil | @valid_visibilities] do
588 Logger.error("Could not exclude visibility to #{visibility}")
589 query
590 end
591
592 defp exclude_visibility(query, _visibility), do: query
593
594 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
595 do: query
596
597 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
598 do: query
599
600 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
601 from(
602 a in query,
603 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
604 )
605 end
606
607 defp restrict_thread_visibility(query, _, _), do: query
608
609 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
610 params =
611 params
612 |> Map.put(:user, reading_user)
613 |> Map.put(:actor_id, user.ap_id)
614
615 %{
616 godmode: params[:godmode],
617 reading_user: reading_user
618 }
619 |> user_activities_recipients()
620 |> fetch_activities(params)
621 |> Enum.reverse()
622 end
623
624 def fetch_user_activities(user, reading_user, params \\ %{})
625
626 def fetch_user_activities(user, reading_user, %{total: true} = params) do
627 result = fetch_activities_for_user(user, reading_user, params)
628
629 Keyword.put(result, :items, Enum.reverse(result[:items]))
630 end
631
632 def fetch_user_activities(user, reading_user, params) do
633 user
634 |> fetch_activities_for_user(reading_user, params)
635 |> Enum.reverse()
636 end
637
638 defp fetch_activities_for_user(user, reading_user, params) do
639 params =
640 params
641 |> Map.put(:type, ["Create", "Announce"])
642 |> Map.put(:user, reading_user)
643 |> Map.put(:actor_id, user.ap_id)
644 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
645
646 params =
647 if User.blocks?(reading_user, user) do
648 params
649 else
650 params
651 |> Map.put(:blocking_user, reading_user)
652 |> Map.put(:muting_user, reading_user)
653 end
654
655 pagination_type = Map.get(params, :pagination_type) || :keyset
656
657 %{
658 godmode: params[:godmode],
659 reading_user: reading_user
660 }
661 |> user_activities_recipients()
662 |> fetch_activities(params, pagination_type)
663 end
664
665 def fetch_statuses(reading_user, %{total: true} = params) do
666 result = fetch_activities_for_reading_user(reading_user, params)
667 Keyword.put(result, :items, Enum.reverse(result[:items]))
668 end
669
670 def fetch_statuses(reading_user, params) do
671 reading_user
672 |> fetch_activities_for_reading_user(params)
673 |> Enum.reverse()
674 end
675
676 defp fetch_activities_for_reading_user(reading_user, params) do
677 params = Map.put(params, :type, ["Create", "Announce"])
678
679 %{
680 godmode: params[:godmode],
681 reading_user: reading_user
682 }
683 |> user_activities_recipients()
684 |> fetch_activities(params, :offset)
685 end
686
687 defp user_activities_recipients(%{godmode: true}), do: []
688
689 defp user_activities_recipients(%{reading_user: reading_user}) do
690 if reading_user do
691 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
692 else
693 [Constants.as_public()]
694 end
695 end
696
697 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
698 raise "Can't use the child object without preloading!"
699 end
700
701 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
702 from(
703 [activity, object] in query,
704 where:
705 fragment(
706 "?->>'type' != ? or ?->>'actor' != ?",
707 activity.data,
708 "Announce",
709 object.data,
710 ^actor
711 )
712 )
713 end
714
715 defp restrict_announce_object_actor(query, _), do: query
716
717 defp restrict_since(query, %{since_id: ""}), do: query
718
719 defp restrict_since(query, %{since_id: since_id}) do
720 from(activity in query, where: activity.id > ^since_id)
721 end
722
723 defp restrict_since(query, _), do: query
724
725 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
726 raise_on_missing_preload()
727 end
728
729 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
730 from(
731 [_activity, object] in query,
732 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
733 )
734 end
735
736 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
737 restrict_embedded_tag_any(query, %{tag: tag})
738 end
739
740 defp restrict_embedded_tag_all(query, _), do: query
741
742 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
743 raise_on_missing_preload()
744 end
745
746 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
747 from(
748 [_activity, object] in query,
749 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
750 )
751 end
752
753 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
754 restrict_embedded_tag_any(query, %{tag: [tag]})
755 end
756
757 defp restrict_embedded_tag_any(query, _), do: query
758
759 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
760 raise_on_missing_preload()
761 end
762
763 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
764 from(
765 [_activity, object] in query,
766 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
767 )
768 end
769
770 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
771 when is_binary(tag_reject) do
772 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
773 end
774
775 defp restrict_embedded_tag_reject_any(query, _), do: query
776
777 defp object_ids_query_for_tags(tags) do
778 from(hto in "hashtags_objects")
779 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
780 |> where([hto, ht], ht.name in ^tags)
781 |> select([hto], hto.object_id)
782 |> distinct([hto], true)
783 end
784
785 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
786 raise_on_missing_preload()
787 end
788
789 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
790 restrict_hashtag_any(query, %{tag: single_tag})
791 end
792
793 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
794 from(
795 [_activity, object] in query,
796 where:
797 fragment(
798 """
799 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
800 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
801 AND hashtags_objects.object_id = ?) @> ?
802 """,
803 ^tags,
804 object.id,
805 ^tags
806 )
807 )
808 end
809
810 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
811 restrict_hashtag_all(query, %{tag_all: [tag]})
812 end
813
814 defp restrict_hashtag_all(query, _), do: query
815
816 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
817 raise_on_missing_preload()
818 end
819
820 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
821 hashtag_ids =
822 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
823 |> Repo.all()
824
825 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
826 from(
827 [_activity, object] in query,
828 join: hto in "hashtags_objects",
829 on: hto.object_id == object.id,
830 where: hto.hashtag_id in ^hashtag_ids,
831 distinct: [desc: object.id],
832 order_by: [desc: object.id]
833 )
834 end
835
836 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
837 restrict_hashtag_any(query, %{tag: [tag]})
838 end
839
840 defp restrict_hashtag_any(query, _), do: query
841
842 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
843 raise_on_missing_preload()
844 end
845
846 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
847 from(
848 [_activity, object] in query,
849 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
850 )
851 end
852
853 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
854 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
855 end
856
857 defp restrict_hashtag_reject_any(query, _), do: query
858
859 defp raise_on_missing_preload do
860 raise "Can't use the child object without preloading!"
861 end
862
863 defp restrict_recipients(query, [], _user), do: query
864
865 defp restrict_recipients(query, recipients, nil) do
866 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
867 end
868
869 defp restrict_recipients(query, recipients, user) do
870 from(
871 activity in query,
872 where: fragment("? && ?", ^recipients, activity.recipients),
873 or_where: activity.actor == ^user.ap_id
874 )
875 end
876
877 defp restrict_local(query, %{local_only: true}) do
878 from(activity in query, where: activity.local == true)
879 end
880
881 defp restrict_local(query, _), do: query
882
883 defp restrict_remote(query, %{remote: true}) do
884 from(activity in query, where: activity.local == false)
885 end
886
887 defp restrict_remote(query, _), do: query
888
889 defp restrict_actor(query, %{actor_id: actor_id}) do
890 from(activity in query, where: activity.actor == ^actor_id)
891 end
892
893 defp restrict_actor(query, _), do: query
894
895 defp restrict_type(query, %{type: type}) when is_binary(type) do
896 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
897 end
898
899 defp restrict_type(query, %{type: type}) do
900 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
901 end
902
903 defp restrict_type(query, _), do: query
904
905 defp restrict_state(query, %{state: state}) do
906 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
907 end
908
909 defp restrict_state(query, _), do: query
910
911 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
912 from(
913 [_activity, object] in query,
914 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
915 )
916 end
917
918 defp restrict_favorited_by(query, _), do: query
919
920 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
921 raise "Can't use the child object without preloading!"
922 end
923
924 defp restrict_media(query, %{only_media: true}) do
925 from(
926 [activity, object] in query,
927 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
928 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
929 )
930 end
931
932 defp restrict_media(query, _), do: query
933
934 defp restrict_replies(query, %{exclude_replies: true}) do
935 from(
936 [_activity, object] in query,
937 where: fragment("?->>'inReplyTo' is null", object.data)
938 )
939 end
940
941 defp restrict_replies(query, %{
942 reply_filtering_user: %User{} = user,
943 reply_visibility: "self"
944 }) do
945 from(
946 [activity, object] in query,
947 where:
948 fragment(
949 "?->>'inReplyTo' is null OR ? = ANY(?)",
950 object.data,
951 ^user.ap_id,
952 activity.recipients
953 )
954 )
955 end
956
957 defp restrict_replies(query, %{
958 reply_filtering_user: %User{} = user,
959 reply_visibility: "following"
960 }) do
961 from(
962 [activity, object] in query,
963 where:
964 fragment(
965 """
966 ?->>'type' != 'Create' -- This isn't a Create
967 OR ?->>'inReplyTo' is null -- this isn't a reply
968 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
969 -- unless they are the author (because authors
970 -- are also part of the recipients). This leads
971 -- to a bug that self-replies by friends won't
972 -- show up.
973 OR ? = ? -- The actor is us
974 """,
975 activity.data,
976 object.data,
977 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
978 activity.recipients,
979 activity.actor,
980 activity.actor,
981 ^user.ap_id
982 )
983 )
984 end
985
986 defp restrict_replies(query, _), do: query
987
988 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
989 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
990 end
991
992 defp restrict_reblogs(query, _), do: query
993
994 defp restrict_muted(query, %{with_muted: true}), do: query
995
996 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
997 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
998
999 query =
1000 from([activity] in query,
1001 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1002 where:
1003 fragment(
1004 "not (?->'to' \\?| ?) or ? = ?",
1005 activity.data,
1006 ^mutes,
1007 activity.actor,
1008 ^user.ap_id
1009 )
1010 )
1011
1012 unless opts[:skip_preload] do
1013 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1014 else
1015 query
1016 end
1017 end
1018
1019 defp restrict_muted(query, _), do: query
1020
1021 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1022 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1023 domain_blocks = user.domain_blocks || []
1024
1025 following_ap_ids = User.get_friends_ap_ids(user)
1026
1027 query =
1028 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1029
1030 from(
1031 [activity, object: o] in query,
1032 # You don't block the author
1033 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1034
1035 # You don't block any recipients, and didn't author the post
1036 where:
1037 fragment(
1038 "((not (? && ?)) or ? = ?)",
1039 activity.recipients,
1040 ^blocked_ap_ids,
1041 activity.actor,
1042 ^user.ap_id
1043 ),
1044
1045 # You don't block the domain of any recipients, and didn't author the post
1046 where:
1047 fragment(
1048 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1049 activity.recipients,
1050 ^domain_blocks,
1051 activity.actor,
1052 ^user.ap_id
1053 ),
1054
1055 # It's not a boost of a user you block
1056 where:
1057 fragment(
1058 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1059 activity.data,
1060 activity.data,
1061 ^blocked_ap_ids
1062 ),
1063
1064 # You don't block the author's domain, and also don't follow the author
1065 where:
1066 fragment(
1067 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1068 activity.actor,
1069 ^domain_blocks,
1070 activity.actor,
1071 ^following_ap_ids
1072 ),
1073
1074 # Same as above, but checks the Object
1075 where:
1076 fragment(
1077 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1078 o.data,
1079 ^domain_blocks,
1080 o.data,
1081 ^following_ap_ids
1082 )
1083 )
1084 end
1085
1086 defp restrict_blocked(query, _), do: query
1087
1088 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1089 if Config.get([:activitypub, :blockers_visible]) == true do
1090 query
1091 else
1092 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1093
1094 from(
1095 activity in query,
1096 # The author doesn't block you
1097 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1098
1099 # It's not a boost of a user that blocks you
1100 where:
1101 fragment(
1102 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1103 activity.data,
1104 activity.data,
1105 ^blocker_ap_ids
1106 )
1107 )
1108 end
1109 end
1110
1111 defp restrict_blockers_visibility(query, _), do: query
1112
1113 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1114 from(
1115 activity in query,
1116 where:
1117 fragment(
1118 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1119 activity.data,
1120 ^[Constants.as_public()]
1121 )
1122 )
1123 end
1124
1125 defp restrict_unlisted(query, _), do: query
1126
1127 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1128 from(
1129 [activity, object: o] in query,
1130 where:
1131 fragment(
1132 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1133 activity.data,
1134 activity.data,
1135 activity.data,
1136 ^ids
1137 )
1138 )
1139 end
1140
1141 defp restrict_pinned(query, _), do: query
1142
1143 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1144 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1145
1146 from(
1147 activity in query,
1148 where:
1149 fragment(
1150 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1151 activity.data,
1152 activity.actor,
1153 ^muted_reblogs
1154 )
1155 )
1156 end
1157
1158 defp restrict_muted_reblogs(query, _), do: query
1159
1160 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1161 from(
1162 activity in query,
1163 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1164 )
1165 end
1166
1167 defp restrict_instance(query, _), do: query
1168
1169 defp restrict_filtered(query, %{user: %User{} = user}) do
1170 case Filter.compose_regex(user) do
1171 nil ->
1172 query
1173
1174 regex ->
1175 from([activity, object] in query,
1176 where:
1177 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1178 activity.actor == ^user.ap_id
1179 )
1180 end
1181 end
1182
1183 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1184 restrict_filtered(query, %{user: user})
1185 end
1186
1187 defp restrict_filtered(query, _), do: query
1188
1189 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1190
1191 defp exclude_poll_votes(query, _) do
1192 if has_named_binding?(query, :object) do
1193 from([activity, object: o] in query,
1194 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1195 )
1196 else
1197 query
1198 end
1199 end
1200
1201 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1202
1203 defp exclude_chat_messages(query, _) do
1204 if has_named_binding?(query, :object) do
1205 from([activity, object: o] in query,
1206 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1207 )
1208 else
1209 query
1210 end
1211 end
1212
1213 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1214
1215 defp exclude_invisible_actors(query, _opts) do
1216 invisible_ap_ids =
1217 User.Query.build(%{invisible: true, select: [:ap_id]})
1218 |> Repo.all()
1219 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1220
1221 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1222 end
1223
1224 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1225 from(activity in query, where: activity.id != ^id)
1226 end
1227
1228 defp exclude_id(query, _), do: query
1229
1230 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1231
1232 defp maybe_preload_objects(query, _) do
1233 query
1234 |> Activity.with_preloaded_object()
1235 end
1236
1237 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1238
1239 defp maybe_preload_bookmarks(query, opts) do
1240 query
1241 |> Activity.with_preloaded_bookmark(opts[:user])
1242 end
1243
1244 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1245 query
1246 |> Activity.with_preloaded_report_notes()
1247 end
1248
1249 defp maybe_preload_report_notes(query, _), do: query
1250
1251 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1252
1253 defp maybe_set_thread_muted_field(query, opts) do
1254 query
1255 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1256 end
1257
1258 defp maybe_order(query, %{order: :desc}) do
1259 query
1260 |> order_by(desc: :id)
1261 end
1262
1263 defp maybe_order(query, %{order: :asc}) do
1264 query
1265 |> order_by(asc: :id)
1266 end
1267
1268 defp maybe_order(query, _), do: query
1269
1270 defp normalize_fetch_activities_query_opts(opts) do
1271 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1272 case opts[key] do
1273 value when is_bitstring(value) ->
1274 Map.put(opts, key, Hashtag.normalize_name(value))
1275
1276 value when is_list(value) ->
1277 normalized_value =
1278 value
1279 |> Enum.map(&Hashtag.normalize_name/1)
1280 |> Enum.uniq()
1281
1282 Map.put(opts, key, normalized_value)
1283
1284 _ ->
1285 opts
1286 end
1287 end)
1288 end
1289
1290 defp fetch_activities_query_ap_ids_ops(opts) do
1291 source_user = opts[:muting_user]
1292 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1293
1294 ap_id_relationships =
1295 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1296 [:block | ap_id_relationships]
1297 else
1298 ap_id_relationships
1299 end
1300
1301 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1302
1303 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1304 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1305
1306 restrict_muted_reblogs_opts =
1307 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1308
1309 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1310 end
1311
1312 def fetch_activities_query(recipients, opts \\ %{}) do
1313 opts = normalize_fetch_activities_query_opts(opts)
1314
1315 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1316 fetch_activities_query_ap_ids_ops(opts)
1317
1318 config = %{
1319 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1320 }
1321
1322 query =
1323 Activity
1324 |> maybe_preload_objects(opts)
1325 |> maybe_preload_bookmarks(opts)
1326 |> maybe_preload_report_notes(opts)
1327 |> maybe_set_thread_muted_field(opts)
1328 |> maybe_order(opts)
1329 |> restrict_recipients(recipients, opts[:user])
1330 |> restrict_replies(opts)
1331 |> restrict_since(opts)
1332 |> restrict_local(opts)
1333 |> restrict_remote(opts)
1334 |> restrict_actor(opts)
1335 |> restrict_type(opts)
1336 |> restrict_state(opts)
1337 |> restrict_favorited_by(opts)
1338 |> restrict_blocked(restrict_blocked_opts)
1339 |> restrict_blockers_visibility(opts)
1340 |> restrict_muted(restrict_muted_opts)
1341 |> restrict_filtered(opts)
1342 |> restrict_media(opts)
1343 |> restrict_visibility(opts)
1344 |> restrict_thread_visibility(opts, config)
1345 |> restrict_reblogs(opts)
1346 |> restrict_pinned(opts)
1347 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1348 |> restrict_instance(opts)
1349 |> restrict_announce_object_actor(opts)
1350 |> restrict_filtered(opts)
1351 |> Activity.restrict_deactivated_users()
1352 |> exclude_poll_votes(opts)
1353 |> exclude_chat_messages(opts)
1354 |> exclude_invisible_actors(opts)
1355 |> exclude_visibility(opts)
1356
1357 if Config.feature_enabled?(:improved_hashtag_timeline) do
1358 query
1359 |> restrict_hashtag_any(opts)
1360 |> restrict_hashtag_all(opts)
1361 |> restrict_hashtag_reject_any(opts)
1362 else
1363 query
1364 |> restrict_embedded_tag_any(opts)
1365 |> restrict_embedded_tag_all(opts)
1366 |> restrict_embedded_tag_reject_any(opts)
1367 end
1368 end
1369
1370 @doc """
1371 Fetch favorites activities of user with order by sort adds to favorites
1372 """
1373 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1374 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1375 user.ap_id
1376 |> Activity.Queries.by_actor()
1377 |> Activity.Queries.by_type("Like")
1378 |> Activity.with_joined_object()
1379 |> Object.with_joined_activity()
1380 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1381 |> order_by([like, _, _], desc_nulls_last: like.id)
1382 |> Pagination.fetch_paginated(
1383 Map.merge(params, %{skip_order: true}),
1384 pagination
1385 )
1386 end
1387
1388 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1389 Enum.map(activities, fn
1390 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1391 if Enum.any?(bcc, &(&1 in list_memberships)) do
1392 update_in(activity.data["cc"], &[user_ap_id | &1])
1393 else
1394 activity
1395 end
1396
1397 activity ->
1398 activity
1399 end)
1400 end
1401
1402 defp maybe_update_cc(activities, _, _), do: activities
1403
1404 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1405 from(activity in query,
1406 where:
1407 fragment("? && ?", activity.recipients, ^recipients) or
1408 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1409 ^Constants.as_public() in activity.recipients)
1410 )
1411 end
1412
1413 def fetch_activities_bounded(
1414 recipients,
1415 recipients_with_public,
1416 opts \\ %{},
1417 pagination \\ :keyset
1418 ) do
1419 fetch_activities_query([], opts)
1420 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1421 |> Pagination.fetch_paginated(opts, pagination)
1422 |> Enum.reverse()
1423 end
1424
1425 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1426 def upload(file, opts \\ []) do
1427 with {:ok, data} <- Upload.store(file, opts) do
1428 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1429
1430 Repo.insert(%Object{data: obj_data})
1431 end
1432 end
1433
1434 @spec get_actor_url(any()) :: binary() | nil
1435 defp get_actor_url(url) when is_binary(url), do: url
1436 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1437
1438 defp get_actor_url(url) when is_list(url) do
1439 url
1440 |> List.first()
1441 |> get_actor_url()
1442 end
1443
1444 defp get_actor_url(_url), do: nil
1445
1446 defp normalize_image(%{"url" => url}) do
1447 %{
1448 "type" => "Image",
1449 "url" => [%{"href" => url}]
1450 }
1451 end
1452
1453 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1454 defp normalize_image(_), do: nil
1455
1456 defp object_to_user_data(data) do
1457 fields =
1458 data
1459 |> Map.get("attachment", [])
1460 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1461 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1462
1463 emojis =
1464 data
1465 |> Map.get("tag", [])
1466 |> Enum.filter(fn
1467 %{"type" => "Emoji"} -> true
1468 _ -> false
1469 end)
1470 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1471 {String.trim(name, ":"), url}
1472 end)
1473
1474 is_locked = data["manuallyApprovesFollowers"] || false
1475 capabilities = data["capabilities"] || %{}
1476 accepts_chat_messages = capabilities["acceptsChatMessages"]
1477 data = Transmogrifier.maybe_fix_user_object(data)
1478 is_discoverable = data["discoverable"] || false
1479 invisible = data["invisible"] || false
1480 actor_type = data["type"] || "Person"
1481
1482 featured_address = data["featured"]
1483 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1484
1485 public_key =
1486 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1487 data["publicKey"]["publicKeyPem"]
1488 else
1489 nil
1490 end
1491
1492 shared_inbox =
1493 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1494 data["endpoints"]["sharedInbox"]
1495 else
1496 nil
1497 end
1498
1499 user_data = %{
1500 ap_id: data["id"],
1501 uri: get_actor_url(data["url"]),
1502 ap_enabled: true,
1503 banner: normalize_image(data["image"]),
1504 fields: fields,
1505 emoji: emojis,
1506 is_locked: is_locked,
1507 is_discoverable: is_discoverable,
1508 invisible: invisible,
1509 avatar: normalize_image(data["icon"]),
1510 name: data["name"],
1511 follower_address: data["followers"],
1512 following_address: data["following"],
1513 featured_address: featured_address,
1514 bio: data["summary"] || "",
1515 actor_type: actor_type,
1516 also_known_as: Map.get(data, "alsoKnownAs", []),
1517 public_key: public_key,
1518 inbox: data["inbox"],
1519 shared_inbox: shared_inbox,
1520 accepts_chat_messages: accepts_chat_messages,
1521 pinned_objects: pinned_objects
1522 }
1523
1524 # nickname can be nil because of virtual actors
1525 if data["preferredUsername"] do
1526 Map.put(
1527 user_data,
1528 :nickname,
1529 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1530 )
1531 else
1532 Map.put(user_data, :nickname, nil)
1533 end
1534 end
1535
1536 def fetch_follow_information_for_user(user) do
1537 with {:ok, following_data} <-
1538 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1539 {:ok, hide_follows} <- collection_private(following_data),
1540 {:ok, followers_data} <-
1541 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1542 {:ok, hide_followers} <- collection_private(followers_data) do
1543 {:ok,
1544 %{
1545 hide_follows: hide_follows,
1546 follower_count: normalize_counter(followers_data["totalItems"]),
1547 following_count: normalize_counter(following_data["totalItems"]),
1548 hide_followers: hide_followers
1549 }}
1550 else
1551 {:error, _} = e -> e
1552 e -> {:error, e}
1553 end
1554 end
1555
1556 defp normalize_counter(counter) when is_integer(counter), do: counter
1557 defp normalize_counter(_), do: 0
1558
1559 def maybe_update_follow_information(user_data) do
1560 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1561 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1562 {_, true} <-
1563 {:collections_available,
1564 !!(user_data[:following_address] && user_data[:follower_address])},
1565 {:ok, info} <-
1566 fetch_follow_information_for_user(user_data) do
1567 info = Map.merge(user_data[:info] || %{}, info)
1568
1569 user_data
1570 |> Map.put(:info, info)
1571 else
1572 {:user_type_check, false} ->
1573 user_data
1574
1575 {:collections_available, false} ->
1576 user_data
1577
1578 {:enabled, false} ->
1579 user_data
1580
1581 e ->
1582 Logger.error(
1583 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1584 )
1585
1586 user_data
1587 end
1588 end
1589
1590 defp collection_private(%{"first" => %{"type" => type}})
1591 when type in ["CollectionPage", "OrderedCollectionPage"],
1592 do: {:ok, false}
1593
1594 defp collection_private(%{"first" => first}) do
1595 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1596 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1597 {:ok, false}
1598 else
1599 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1600 {:error, _} = e -> e
1601 e -> {:error, e}
1602 end
1603 end
1604
1605 defp collection_private(_data), do: {:ok, true}
1606
1607 def user_data_from_user_object(data) do
1608 with {:ok, data} <- MRF.filter(data) do
1609 {:ok, object_to_user_data(data)}
1610 else
1611 e -> {:error, e}
1612 end
1613 end
1614
1615 def fetch_and_prepare_user_from_ap_id(ap_id) do
1616 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1617 {:ok, data} <- user_data_from_user_object(data) do
1618 {:ok, maybe_update_follow_information(data)}
1619 else
1620 # If this has been deleted, only log a debug and not an error
1621 {:error, "Object has been deleted" = e} ->
1622 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1623 {:error, e}
1624
1625 {:error, {:reject, reason} = e} ->
1626 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1627 {:error, e}
1628
1629 {:error, e} ->
1630 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1631 {:error, e}
1632 end
1633 end
1634
1635 def maybe_handle_clashing_nickname(data) do
1636 with nickname when is_binary(nickname) <- data[:nickname],
1637 %User{} = old_user <- User.get_by_nickname(nickname),
1638 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1639 Logger.info(
1640 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1641 )
1642
1643 old_user
1644 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1645 |> User.update_and_set_cache()
1646 else
1647 {:ap_id_comparison, true} ->
1648 Logger.info(
1649 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1650 )
1651
1652 _ ->
1653 nil
1654 end
1655 end
1656
1657 def pin_data_from_featured_collection(%{
1658 "type" => type,
1659 "orderedItems" => objects
1660 })
1661 when type in ["OrderedCollection", "Collection"] do
1662 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1663 end
1664
1665 def fetch_and_prepare_featured_from_ap_id(nil) do
1666 {:ok, %{}}
1667 end
1668
1669 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1670 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1671 {:ok, pin_data_from_featured_collection(data)}
1672 else
1673 e ->
1674 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1675 {:ok, %{}}
1676 end
1677 end
1678
1679 def pinned_fetch_task(nil), do: nil
1680
1681 def pinned_fetch_task(%{pinned_objects: pins}) do
1682 if Enum.all?(pins, fn {ap_id, _} ->
1683 Object.get_cached_by_ap_id(ap_id) ||
1684 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1685 end) do
1686 :ok
1687 else
1688 :error
1689 end
1690 end
1691
1692 def make_user_from_ap_id(ap_id) do
1693 user = User.get_cached_by_ap_id(ap_id)
1694
1695 if user && !User.ap_enabled?(user) do
1696 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1697 else
1698 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1699 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1700
1701 if user do
1702 user
1703 |> User.remote_user_changeset(data)
1704 |> User.update_and_set_cache()
1705 else
1706 maybe_handle_clashing_nickname(data)
1707
1708 data
1709 |> User.remote_user_changeset()
1710 |> Repo.insert()
1711 |> User.set_cache()
1712 end
1713 end
1714 end
1715 end
1716
1717 def make_user_from_nickname(nickname) do
1718 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1719 make_user_from_ap_id(ap_id)
1720 else
1721 _e -> {:error, "No AP id in WebFinger"}
1722 end
1723 end
1724
1725 # filter out broken threads
1726 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1727 entire_thread_visible_for_user?(activity, user)
1728 end
1729
1730 # do post-processing on a specific activity
1731 def contain_activity(%Activity{} = activity, %User{} = user) do
1732 contain_broken_threads(activity, user)
1733 end
1734
1735 def fetch_direct_messages_query do
1736 Activity
1737 |> restrict_type(%{type: "Create"})
1738 |> restrict_visibility(%{visibility: "direct"})
1739 |> order_by([activity], asc: activity.id)
1740 end
1741 end