Add support for a `first` reference in pinned objects
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
29
30 import Ecto.Query
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
33
34 require Logger
35 require Pleroma.Constants
36
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
54 {recipients, to, cc}
55 end
56
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
63 _ -> false
64 end
65 end
66
67 defp check_actor_can_insert(_), do: true
68
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
72 end
73
74 defp check_remote_limit(_), do: true
75
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78 end
79
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82 end
83
84 def update_last_status_at_if_public(actor, object) do
85 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
86 end
87
88 defp increase_replies_count_if_reply(%{
89 "object" => %{"inReplyTo" => reply_ap_id} = object,
90 "type" => "Create"
91 }) do
92 if is_public?(object) do
93 Object.increase_replies_count(reply_ap_id)
94 end
95 end
96
97 defp increase_replies_count_if_reply(_create_data), do: :noop
98
99 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
100 @impl true
101 def persist(%{"type" => type} = object, meta) when type in @object_types do
102 with {:ok, object} <- Object.create(object) do
103 {:ok, object, meta}
104 end
105 end
106
107 @impl true
108 def persist(object, meta) do
109 with local <- Keyword.fetch!(meta, :local),
110 {recipients, _, _} <- get_recipients(object),
111 {:ok, activity} <-
112 Repo.insert(%Activity{
113 data: object,
114 local: local,
115 recipients: recipients,
116 actor: object["actor"]
117 }),
118 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
119 {:ok, _} <- maybe_create_activity_expiration(activity) do
120 {:ok, activity, meta}
121 end
122 end
123
124 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
125 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
129 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
134 {:ok, map, object} <- insert_full_object(map),
135 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
136 # Splice in the child object if we have one.
137 activity = Maps.put_if_present(activity, :object, object)
138
139 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
140 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
141 end)
142
143 # Add local posts to search index
144 if local, do: Pleroma.Search.add_to_index(activity)
145
146 {:ok, activity}
147 else
148 %Activity{} = activity ->
149 {:ok, activity}
150
151 {:actor_check, _} ->
152 {:error, false}
153
154 {:containment, _} = error ->
155 error
156
157 {:error, _} = error ->
158 error
159
160 {:fake, true, map, recipients} ->
161 activity = %Activity{
162 data: map,
163 local: local,
164 actor: map["actor"],
165 recipients: recipients,
166 id: "pleroma:fakeid"
167 }
168
169 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
170 {:ok, activity}
171
172 {:remote_limit_pass, _} ->
173 {:error, :remote_limit}
174
175 {:reject, _} = e ->
176 {:error, e}
177 end
178 end
179
180 defp insert_activity_with_expiration(data, local, recipients) do
181 struct = %Activity{
182 data: data,
183 local: local,
184 actor: data["actor"],
185 recipients: recipients
186 }
187
188 with {:ok, activity} <- Repo.insert(struct) do
189 maybe_create_activity_expiration(activity)
190 end
191 end
192
193 def notify_and_stream(activity) do
194 Notification.create_notifications(activity)
195
196 conversation = create_or_bump_conversation(activity, activity.actor)
197 participations = get_participations(conversation)
198 stream_out(activity)
199 stream_out_participations(participations)
200 end
201
202 defp maybe_create_activity_expiration(
203 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
204 ) do
205 with {:ok, _job} <-
206 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
207 activity_id: activity.id,
208 expires_at: expires_at
209 }) do
210 {:ok, activity}
211 end
212 end
213
214 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
215
216 defp create_or_bump_conversation(activity, actor) do
217 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
218 %User{} = user <- User.get_cached_by_ap_id(actor) do
219 Participation.mark_as_read(user, conversation)
220 {:ok, conversation}
221 end
222 end
223
224 defp get_participations({:ok, conversation}) do
225 conversation
226 |> Repo.preload(:participations, force: true)
227 |> Map.get(:participations)
228 end
229
230 defp get_participations(_), do: []
231
232 def stream_out_participations(participations) do
233 participations =
234 participations
235 |> Repo.preload(:user)
236
237 Streamer.stream("participation", participations)
238 end
239
240 @impl true
241 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
242 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
243 conversation = Repo.preload(conversation, :participations)
244
245 last_activity_id =
246 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
247 user: user,
248 blocking_user: user
249 })
250
251 if last_activity_id do
252 stream_out_participations(conversation.participations)
253 end
254 end
255 end
256
257 @impl true
258 def stream_out_participations(_, _), do: :noop
259
260 @impl true
261 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
262 when data_type in ["Create", "Announce", "Delete"] do
263 activity
264 |> Topics.get_activity_topics()
265 |> Streamer.stream(activity)
266 end
267
268 @impl true
269 def stream_out(_activity) do
270 :noop
271 end
272
273 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
274 def create(params, fake \\ false) do
275 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
276 result
277 end
278 end
279
280 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
285 quick_insert? = Config.get([:env]) == :benchmark
286
287 create_data =
288 make_create_data(
289 %{to: to, actor: actor, published: published, context: context, object: object},
290 additional
291 )
292
293 with {:ok, activity} <- insert(create_data, local, fake),
294 {:fake, false, activity} <- {:fake, fake, activity},
295 _ <- increase_replies_count_if_reply(create_data),
296 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
297 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
298 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
299 _ <- notify_and_stream(activity),
300 :ok <- maybe_schedule_poll_notifications(activity),
301 :ok <- maybe_federate(activity) do
302 {:ok, activity}
303 else
304 {:quick_insert, true, activity} ->
305 {:ok, activity}
306
307 {:fake, true, activity} ->
308 {:ok, activity}
309
310 {:error, message} ->
311 Repo.rollback(message)
312 end
313 end
314
315 defp maybe_schedule_poll_notifications(activity) do
316 PollWorker.schedule_poll_end(activity)
317 :ok
318 end
319
320 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
321 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
322 additional = params[:additional] || %{}
323 # only accept false as false value
324 local = !(params[:local] == false)
325 published = params[:published]
326
327 listen_data =
328 make_listen_data(
329 %{to: to, actor: actor, published: published, context: context, object: object},
330 additional
331 )
332
333 with {:ok, activity} <- insert(listen_data, local),
334 _ <- notify_and_stream(activity),
335 :ok <- maybe_federate(activity) do
336 {:ok, activity}
337 end
338 end
339
340 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
341 {:ok, Activity.t()} | nil | {:error, any()}
342 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
343 with {:ok, result} <-
344 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
345 result
346 end
347 end
348
349 defp do_unfollow(follower, followed, activity_id, local) do
350 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
351 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
352 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
353 {:ok, activity} <- insert(unfollow_data, local),
354 _ <- notify_and_stream(activity),
355 :ok <- maybe_federate(activity) do
356 {:ok, activity}
357 else
358 nil -> nil
359 {:error, error} -> Repo.rollback(error)
360 end
361 end
362
363 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
364 def flag(params) do
365 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
366 result
367 end
368 end
369
370 defp do_flag(
371 %{
372 actor: actor,
373 context: _context,
374 account: account,
375 statuses: statuses,
376 content: content
377 } = params
378 ) do
379 # only accept false as false value
380 local = !(params[:local] == false)
381 forward = !(params[:forward] == false)
382
383 additional = params[:additional] || %{}
384
385 additional =
386 if forward do
387 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
388 else
389 Map.merge(additional, %{"to" => [], "cc" => []})
390 end
391
392 with flag_data <- make_flag_data(params, additional),
393 {:ok, activity} <- insert(flag_data, local),
394 {:ok, stripped_activity} <- strip_report_status_data(activity),
395 _ <- notify_and_stream(activity),
396 :ok <-
397 maybe_federate(stripped_activity) do
398 User.all_superusers()
399 |> Enum.filter(fn user -> user.ap_id != actor end)
400 |> Enum.filter(fn user -> not is_nil(user.email) end)
401 |> Enum.each(fn superuser ->
402 superuser
403 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
404 |> Pleroma.Emails.Mailer.deliver_async()
405 end)
406
407 {:ok, activity}
408 else
409 {:error, error} -> Repo.rollback(error)
410 end
411 end
412
413 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
414 def move(%User{} = origin, %User{} = target, local \\ true) do
415 params = %{
416 "type" => "Move",
417 "actor" => origin.ap_id,
418 "object" => origin.ap_id,
419 "target" => target.ap_id
420 }
421
422 with true <- origin.ap_id in target.also_known_as,
423 {:ok, activity} <- insert(params, local),
424 _ <- notify_and_stream(activity) do
425 maybe_federate(activity)
426
427 BackgroundWorker.enqueue("move_following", %{
428 "origin_id" => origin.id,
429 "target_id" => target.id
430 })
431
432 {:ok, activity}
433 else
434 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
435 err -> err
436 end
437 end
438
439 def fetch_activities_for_context_query(context, opts) do
440 public = [Constants.as_public()]
441
442 recipients =
443 if opts[:user],
444 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
445 else: public
446
447 from(activity in Activity)
448 |> maybe_preload_objects(opts)
449 |> maybe_preload_bookmarks(opts)
450 |> maybe_set_thread_muted_field(opts)
451 |> restrict_blocked(opts)
452 |> restrict_blockers_visibility(opts)
453 |> restrict_recipients(recipients, opts[:user])
454 |> restrict_filtered(opts)
455 |> where(
456 [activity],
457 fragment(
458 "?->>'type' = ? and ?->>'context' = ?",
459 activity.data,
460 "Create",
461 activity.data,
462 ^context
463 )
464 )
465 |> exclude_poll_votes(opts)
466 |> exclude_id(opts)
467 |> order_by([activity], desc: activity.id)
468 end
469
470 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
471 def fetch_activities_for_context(context, opts \\ %{}) do
472 context
473 |> fetch_activities_for_context_query(opts)
474 |> Repo.all()
475 end
476
477 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
478 FlakeId.Ecto.CompatType.t() | nil
479 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
480 context
481 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
482 |> restrict_visibility(%{visibility: "direct"})
483 |> limit(1)
484 |> select([a], a.id)
485 |> Repo.one()
486 end
487
488 defp fetch_paginated_optimized(query, opts, pagination) do
489 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
490 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
491 opts = Map.put(opts, :skip_extra_order, true)
492
493 Pagination.fetch_paginated(query, opts, pagination)
494 end
495
496 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
497 list_memberships = Pleroma.List.memberships(opts[:user])
498
499 fetch_activities_query(recipients ++ list_memberships, opts)
500 |> fetch_paginated_optimized(opts, pagination)
501 |> Enum.reverse()
502 |> maybe_update_cc(list_memberships, opts[:user])
503 end
504
505 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
506 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
507 opts = Map.delete(opts, :user)
508
509 [Constants.as_public()]
510 |> fetch_activities_query(opts)
511 |> restrict_unlisted(opts)
512 |> fetch_paginated_optimized(opts, pagination)
513 end
514
515 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
516 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
517 opts
518 |> Map.put(:restrict_unlisted, true)
519 |> fetch_public_or_unlisted_activities(pagination)
520 end
521
522 @valid_visibilities ~w[direct unlisted public private]
523
524 defp restrict_visibility(query, %{visibility: visibility})
525 when is_list(visibility) do
526 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
527 from(
528 a in query,
529 where:
530 fragment(
531 "activity_visibility(?, ?, ?) = ANY (?)",
532 a.actor,
533 a.recipients,
534 a.data,
535 ^visibility
536 )
537 )
538 else
539 Logger.error("Could not restrict visibility to #{visibility}")
540 end
541 end
542
543 defp restrict_visibility(query, %{visibility: visibility})
544 when visibility in @valid_visibilities do
545 from(
546 a in query,
547 where:
548 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
549 )
550 end
551
552 defp restrict_visibility(_query, %{visibility: visibility})
553 when visibility not in @valid_visibilities do
554 Logger.error("Could not restrict visibility to #{visibility}")
555 end
556
557 defp restrict_visibility(query, _visibility), do: query
558
559 defp exclude_visibility(query, %{exclude_visibilities: visibility})
560 when is_list(visibility) do
561 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
562 from(
563 a in query,
564 where:
565 not fragment(
566 "activity_visibility(?, ?, ?) = ANY (?)",
567 a.actor,
568 a.recipients,
569 a.data,
570 ^visibility
571 )
572 )
573 else
574 Logger.error("Could not exclude visibility to #{visibility}")
575 query
576 end
577 end
578
579 defp exclude_visibility(query, %{exclude_visibilities: visibility})
580 when visibility in @valid_visibilities do
581 from(
582 a in query,
583 where:
584 not fragment(
585 "activity_visibility(?, ?, ?) = ?",
586 a.actor,
587 a.recipients,
588 a.data,
589 ^visibility
590 )
591 )
592 end
593
594 defp exclude_visibility(query, %{exclude_visibilities: visibility})
595 when visibility not in [nil | @valid_visibilities] do
596 Logger.error("Could not exclude visibility to #{visibility}")
597 query
598 end
599
600 defp exclude_visibility(query, _visibility), do: query
601
602 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
603 do: query
604
605 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
606 do: query
607
608 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
609 from(
610 a in query,
611 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
612 )
613 end
614
615 defp restrict_thread_visibility(query, _, _), do: query
616
617 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
618 params =
619 params
620 |> Map.put(:user, reading_user)
621 |> Map.put(:actor_id, user.ap_id)
622
623 %{
624 godmode: params[:godmode],
625 reading_user: reading_user
626 }
627 |> user_activities_recipients()
628 |> fetch_activities(params)
629 |> Enum.reverse()
630 end
631
632 def fetch_user_activities(user, reading_user, params \\ %{})
633
634 def fetch_user_activities(user, reading_user, %{total: true} = params) do
635 result = fetch_activities_for_user(user, reading_user, params)
636
637 Keyword.put(result, :items, Enum.reverse(result[:items]))
638 end
639
640 def fetch_user_activities(user, reading_user, params) do
641 user
642 |> fetch_activities_for_user(reading_user, params)
643 |> Enum.reverse()
644 end
645
646 defp fetch_activities_for_user(user, reading_user, params) do
647 params =
648 params
649 |> Map.put(:type, ["Create", "Announce"])
650 |> Map.put(:user, reading_user)
651 |> Map.put(:actor_id, user.ap_id)
652 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
653
654 params =
655 if User.blocks?(reading_user, user) do
656 params
657 else
658 params
659 |> Map.put(:blocking_user, reading_user)
660 |> Map.put(:muting_user, reading_user)
661 end
662
663 pagination_type = Map.get(params, :pagination_type) || :keyset
664
665 %{
666 godmode: params[:godmode],
667 reading_user: reading_user
668 }
669 |> user_activities_recipients()
670 |> fetch_activities(params, pagination_type)
671 end
672
673 def fetch_statuses(reading_user, %{total: true} = params) do
674 result = fetch_activities_for_reading_user(reading_user, params)
675 Keyword.put(result, :items, Enum.reverse(result[:items]))
676 end
677
678 def fetch_statuses(reading_user, params) do
679 reading_user
680 |> fetch_activities_for_reading_user(params)
681 |> Enum.reverse()
682 end
683
684 defp fetch_activities_for_reading_user(reading_user, params) do
685 params = Map.put(params, :type, ["Create", "Announce"])
686
687 %{
688 godmode: params[:godmode],
689 reading_user: reading_user
690 }
691 |> user_activities_recipients()
692 |> fetch_activities(params, :offset)
693 end
694
695 defp user_activities_recipients(%{godmode: true}), do: []
696
697 defp user_activities_recipients(%{reading_user: reading_user}) do
698 if reading_user do
699 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
700 else
701 [Constants.as_public()]
702 end
703 end
704
705 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
706 raise "Can't use the child object without preloading!"
707 end
708
709 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
710 from(
711 [activity, object] in query,
712 where:
713 fragment(
714 "?->>'type' != ? or ?->>'actor' != ?",
715 activity.data,
716 "Announce",
717 object.data,
718 ^actor
719 )
720 )
721 end
722
723 defp restrict_announce_object_actor(query, _), do: query
724
725 defp restrict_since(query, %{since_id: ""}), do: query
726
727 defp restrict_since(query, %{since_id: since_id}) do
728 from(activity in query, where: activity.id > ^since_id)
729 end
730
731 defp restrict_since(query, _), do: query
732
733 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
734 raise_on_missing_preload()
735 end
736
737 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
738 from(
739 [_activity, object] in query,
740 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
741 )
742 end
743
744 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
745 restrict_embedded_tag_any(query, %{tag: tag})
746 end
747
748 defp restrict_embedded_tag_all(query, _), do: query
749
750 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
751 raise_on_missing_preload()
752 end
753
754 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
755 from(
756 [_activity, object] in query,
757 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
758 )
759 end
760
761 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
762 restrict_embedded_tag_any(query, %{tag: [tag]})
763 end
764
765 defp restrict_embedded_tag_any(query, _), do: query
766
767 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
768 raise_on_missing_preload()
769 end
770
771 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
772 from(
773 [_activity, object] in query,
774 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
775 )
776 end
777
778 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
779 when is_binary(tag_reject) do
780 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
781 end
782
783 defp restrict_embedded_tag_reject_any(query, _), do: query
784
785 defp object_ids_query_for_tags(tags) do
786 from(hto in "hashtags_objects")
787 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
788 |> where([hto, ht], ht.name in ^tags)
789 |> select([hto], hto.object_id)
790 |> distinct([hto], true)
791 end
792
793 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
794 raise_on_missing_preload()
795 end
796
797 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
798 restrict_hashtag_any(query, %{tag: single_tag})
799 end
800
801 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
802 from(
803 [_activity, object] in query,
804 where:
805 fragment(
806 """
807 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
808 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
809 AND hashtags_objects.object_id = ?) @> ?
810 """,
811 ^tags,
812 object.id,
813 ^tags
814 )
815 )
816 end
817
818 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
819 restrict_hashtag_all(query, %{tag_all: [tag]})
820 end
821
822 defp restrict_hashtag_all(query, _), do: query
823
824 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
825 raise_on_missing_preload()
826 end
827
828 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
829 hashtag_ids =
830 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
831 |> Repo.all()
832
833 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
834 from(
835 [_activity, object] in query,
836 join: hto in "hashtags_objects",
837 on: hto.object_id == object.id,
838 where: hto.hashtag_id in ^hashtag_ids,
839 distinct: [desc: object.id],
840 order_by: [desc: object.id]
841 )
842 end
843
844 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
845 restrict_hashtag_any(query, %{tag: [tag]})
846 end
847
848 defp restrict_hashtag_any(query, _), do: query
849
850 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
851 raise_on_missing_preload()
852 end
853
854 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
855 from(
856 [_activity, object] in query,
857 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
858 )
859 end
860
861 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
862 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
863 end
864
865 defp restrict_hashtag_reject_any(query, _), do: query
866
867 defp raise_on_missing_preload do
868 raise "Can't use the child object without preloading!"
869 end
870
871 defp restrict_recipients(query, [], _user), do: query
872
873 defp restrict_recipients(query, recipients, nil) do
874 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
875 end
876
877 defp restrict_recipients(query, recipients, user) do
878 from(
879 activity in query,
880 where: fragment("? && ?", ^recipients, activity.recipients),
881 or_where: activity.actor == ^user.ap_id
882 )
883 end
884
885 defp restrict_local(query, %{local_only: true}) do
886 from(activity in query, where: activity.local == true)
887 end
888
889 defp restrict_local(query, _), do: query
890
891 defp restrict_remote(query, %{remote: true}) do
892 from(activity in query, where: activity.local == false)
893 end
894
895 defp restrict_remote(query, _), do: query
896
897 defp restrict_actor(query, %{actor_id: actor_id}) do
898 from(activity in query, where: activity.actor == ^actor_id)
899 end
900
901 defp restrict_actor(query, _), do: query
902
903 defp restrict_type(query, %{type: type}) when is_binary(type) do
904 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
905 end
906
907 defp restrict_type(query, %{type: type}) do
908 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
909 end
910
911 defp restrict_type(query, _), do: query
912
913 defp restrict_state(query, %{state: state}) do
914 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
915 end
916
917 defp restrict_state(query, _), do: query
918
919 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
920 from(
921 [_activity, object] in query,
922 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
923 )
924 end
925
926 defp restrict_favorited_by(query, _), do: query
927
928 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
929 raise "Can't use the child object without preloading!"
930 end
931
932 defp restrict_media(query, %{only_media: true}) do
933 from(
934 [activity, object] in query,
935 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
936 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
937 )
938 end
939
940 defp restrict_media(query, _), do: query
941
942 defp restrict_replies(query, %{exclude_replies: true}) do
943 from(
944 [_activity, object] in query,
945 where: fragment("?->>'inReplyTo' is null", object.data)
946 )
947 end
948
949 defp restrict_replies(query, %{
950 reply_filtering_user: %User{} = user,
951 reply_visibility: "self"
952 }) do
953 from(
954 [activity, object] in query,
955 where:
956 fragment(
957 "?->>'inReplyTo' is null OR ? = ANY(?)",
958 object.data,
959 ^user.ap_id,
960 activity.recipients
961 )
962 )
963 end
964
965 defp restrict_replies(query, %{
966 reply_filtering_user: %User{} = user,
967 reply_visibility: "following"
968 }) do
969 from(
970 [activity, object] in query,
971 where:
972 fragment(
973 """
974 ?->>'type' != 'Create' -- This isn't a Create
975 OR ?->>'inReplyTo' is null -- this isn't a reply
976 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
977 -- unless they are the author (because authors
978 -- are also part of the recipients). This leads
979 -- to a bug that self-replies by friends won't
980 -- show up.
981 OR ? = ? -- The actor is us
982 """,
983 activity.data,
984 object.data,
985 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
986 activity.recipients,
987 activity.actor,
988 activity.actor,
989 ^user.ap_id
990 )
991 )
992 end
993
994 defp restrict_replies(query, _), do: query
995
996 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
997 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
998 end
999
1000 defp restrict_reblogs(query, _), do: query
1001
1002 defp restrict_muted(query, %{with_muted: true}), do: query
1003
1004 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1005 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1006
1007 query =
1008 from([activity] in query,
1009 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1010 where:
1011 fragment(
1012 "not (?->'to' \\?| ?) or ? = ?",
1013 activity.data,
1014 ^mutes,
1015 activity.actor,
1016 ^user.ap_id
1017 )
1018 )
1019
1020 unless opts[:skip_preload] do
1021 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1022 else
1023 query
1024 end
1025 end
1026
1027 defp restrict_muted(query, _), do: query
1028
1029 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1030 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1031 domain_blocks = user.domain_blocks || []
1032
1033 following_ap_ids = User.get_friends_ap_ids(user)
1034
1035 query =
1036 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1037
1038 from(
1039 [activity, object: o] in query,
1040 # You don't block the author
1041 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1042
1043 # You don't block any recipients, and didn't author the post
1044 where:
1045 fragment(
1046 "((not (? && ?)) or ? = ?)",
1047 activity.recipients,
1048 ^blocked_ap_ids,
1049 activity.actor,
1050 ^user.ap_id
1051 ),
1052
1053 # You don't block the domain of any recipients, and didn't author the post
1054 where:
1055 fragment(
1056 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1057 activity.recipients,
1058 ^domain_blocks,
1059 activity.actor,
1060 ^user.ap_id
1061 ),
1062
1063 # It's not a boost of a user you block
1064 where:
1065 fragment(
1066 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1067 activity.data,
1068 activity.data,
1069 ^blocked_ap_ids
1070 ),
1071
1072 # You don't block the author's domain, and also don't follow the author
1073 where:
1074 fragment(
1075 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1076 activity.actor,
1077 ^domain_blocks,
1078 activity.actor,
1079 ^following_ap_ids
1080 ),
1081
1082 # Same as above, but checks the Object
1083 where:
1084 fragment(
1085 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1086 o.data,
1087 ^domain_blocks,
1088 o.data,
1089 ^following_ap_ids
1090 )
1091 )
1092 end
1093
1094 defp restrict_blocked(query, _), do: query
1095
1096 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1097 if Config.get([:activitypub, :blockers_visible]) == true do
1098 query
1099 else
1100 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1101
1102 from(
1103 activity in query,
1104 # The author doesn't block you
1105 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1106
1107 # It's not a boost of a user that blocks you
1108 where:
1109 fragment(
1110 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1111 activity.data,
1112 activity.data,
1113 ^blocker_ap_ids
1114 )
1115 )
1116 end
1117 end
1118
1119 defp restrict_blockers_visibility(query, _), do: query
1120
1121 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1122 from(
1123 activity in query,
1124 where:
1125 fragment(
1126 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1127 activity.data,
1128 ^[Constants.as_public()]
1129 )
1130 )
1131 end
1132
1133 defp restrict_unlisted(query, _), do: query
1134
1135 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1136 from(
1137 [activity, object: o] in query,
1138 where:
1139 fragment(
1140 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1141 activity.data,
1142 activity.data,
1143 activity.data,
1144 ^ids
1145 )
1146 )
1147 end
1148
1149 defp restrict_pinned(query, _), do: query
1150
1151 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1152 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1153
1154 from(
1155 activity in query,
1156 where:
1157 fragment(
1158 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1159 activity.data,
1160 activity.actor,
1161 ^muted_reblogs
1162 )
1163 )
1164 end
1165
1166 defp restrict_muted_reblogs(query, _), do: query
1167
1168 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1169 from(
1170 activity in query,
1171 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1172 )
1173 end
1174
1175 defp restrict_instance(query, _), do: query
1176
1177 defp restrict_filtered(query, %{user: %User{} = user}) do
1178 case Filter.compose_regex(user) do
1179 nil ->
1180 query
1181
1182 regex ->
1183 from([activity, object] in query,
1184 where:
1185 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1186 activity.actor == ^user.ap_id
1187 )
1188 end
1189 end
1190
1191 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1192 restrict_filtered(query, %{user: user})
1193 end
1194
1195 defp restrict_filtered(query, _), do: query
1196
1197 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1198
1199 defp exclude_poll_votes(query, _) do
1200 if has_named_binding?(query, :object) do
1201 from([activity, object: o] in query,
1202 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1203 )
1204 else
1205 query
1206 end
1207 end
1208
1209 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1210
1211 defp exclude_chat_messages(query, _) do
1212 if has_named_binding?(query, :object) do
1213 from([activity, object: o] in query,
1214 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1215 )
1216 else
1217 query
1218 end
1219 end
1220
1221 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1222
1223 defp exclude_invisible_actors(query, _opts) do
1224 invisible_ap_ids =
1225 User.Query.build(%{invisible: true, select: [:ap_id]})
1226 |> Repo.all()
1227 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1228
1229 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1230 end
1231
1232 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1233 from(activity in query, where: activity.id != ^id)
1234 end
1235
1236 defp exclude_id(query, _), do: query
1237
1238 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1239
1240 defp maybe_preload_objects(query, _) do
1241 query
1242 |> Activity.with_preloaded_object()
1243 end
1244
1245 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1246
1247 defp maybe_preload_bookmarks(query, opts) do
1248 query
1249 |> Activity.with_preloaded_bookmark(opts[:user])
1250 end
1251
1252 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1253 query
1254 |> Activity.with_preloaded_report_notes()
1255 end
1256
1257 defp maybe_preload_report_notes(query, _), do: query
1258
1259 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1260
1261 defp maybe_set_thread_muted_field(query, opts) do
1262 query
1263 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1264 end
1265
1266 defp maybe_order(query, %{order: :desc}) do
1267 query
1268 |> order_by(desc: :id)
1269 end
1270
1271 defp maybe_order(query, %{order: :asc}) do
1272 query
1273 |> order_by(asc: :id)
1274 end
1275
1276 defp maybe_order(query, _), do: query
1277
1278 defp normalize_fetch_activities_query_opts(opts) do
1279 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1280 case opts[key] do
1281 value when is_bitstring(value) ->
1282 Map.put(opts, key, Hashtag.normalize_name(value))
1283
1284 value when is_list(value) ->
1285 normalized_value =
1286 value
1287 |> Enum.map(&Hashtag.normalize_name/1)
1288 |> Enum.uniq()
1289
1290 Map.put(opts, key, normalized_value)
1291
1292 _ ->
1293 opts
1294 end
1295 end)
1296 end
1297
1298 defp fetch_activities_query_ap_ids_ops(opts) do
1299 source_user = opts[:muting_user]
1300 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1301
1302 ap_id_relationships =
1303 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1304 [:block | ap_id_relationships]
1305 else
1306 ap_id_relationships
1307 end
1308
1309 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1310
1311 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1312 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1313
1314 restrict_muted_reblogs_opts =
1315 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1316
1317 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1318 end
1319
1320 def fetch_activities_query(recipients, opts \\ %{}) do
1321 opts = normalize_fetch_activities_query_opts(opts)
1322
1323 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1324 fetch_activities_query_ap_ids_ops(opts)
1325
1326 config = %{
1327 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1328 }
1329
1330 query =
1331 Activity
1332 |> maybe_preload_objects(opts)
1333 |> maybe_preload_bookmarks(opts)
1334 |> maybe_preload_report_notes(opts)
1335 |> maybe_set_thread_muted_field(opts)
1336 |> maybe_order(opts)
1337 |> restrict_recipients(recipients, opts[:user])
1338 |> restrict_replies(opts)
1339 |> restrict_since(opts)
1340 |> restrict_local(opts)
1341 |> restrict_remote(opts)
1342 |> restrict_actor(opts)
1343 |> restrict_type(opts)
1344 |> restrict_state(opts)
1345 |> restrict_favorited_by(opts)
1346 |> restrict_blocked(restrict_blocked_opts)
1347 |> restrict_blockers_visibility(opts)
1348 |> restrict_muted(restrict_muted_opts)
1349 |> restrict_filtered(opts)
1350 |> restrict_media(opts)
1351 |> restrict_visibility(opts)
1352 |> restrict_thread_visibility(opts, config)
1353 |> restrict_reblogs(opts)
1354 |> restrict_pinned(opts)
1355 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1356 |> restrict_instance(opts)
1357 |> restrict_announce_object_actor(opts)
1358 |> restrict_filtered(opts)
1359 |> Activity.restrict_deactivated_users()
1360 |> exclude_poll_votes(opts)
1361 |> exclude_chat_messages(opts)
1362 |> exclude_invisible_actors(opts)
1363 |> exclude_visibility(opts)
1364
1365 if Config.feature_enabled?(:improved_hashtag_timeline) do
1366 query
1367 |> restrict_hashtag_any(opts)
1368 |> restrict_hashtag_all(opts)
1369 |> restrict_hashtag_reject_any(opts)
1370 else
1371 query
1372 |> restrict_embedded_tag_any(opts)
1373 |> restrict_embedded_tag_all(opts)
1374 |> restrict_embedded_tag_reject_any(opts)
1375 end
1376 end
1377
1378 @doc """
1379 Fetch favorites activities of user with order by sort adds to favorites
1380 """
1381 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1382 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1383 user.ap_id
1384 |> Activity.Queries.by_actor()
1385 |> Activity.Queries.by_type("Like")
1386 |> Activity.with_joined_object()
1387 |> Object.with_joined_activity()
1388 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1389 |> order_by([like, _, _], desc_nulls_last: like.id)
1390 |> Pagination.fetch_paginated(
1391 Map.merge(params, %{skip_order: true}),
1392 pagination
1393 )
1394 end
1395
1396 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1397 Enum.map(activities, fn
1398 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1399 if Enum.any?(bcc, &(&1 in list_memberships)) do
1400 update_in(activity.data["cc"], &[user_ap_id | &1])
1401 else
1402 activity
1403 end
1404
1405 activity ->
1406 activity
1407 end)
1408 end
1409
1410 defp maybe_update_cc(activities, _, _), do: activities
1411
1412 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1413 from(activity in query,
1414 where:
1415 fragment("? && ?", activity.recipients, ^recipients) or
1416 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1417 ^Constants.as_public() in activity.recipients)
1418 )
1419 end
1420
1421 def fetch_activities_bounded(
1422 recipients,
1423 recipients_with_public,
1424 opts \\ %{},
1425 pagination \\ :keyset
1426 ) do
1427 fetch_activities_query([], opts)
1428 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1429 |> Pagination.fetch_paginated(opts, pagination)
1430 |> Enum.reverse()
1431 end
1432
1433 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1434 def upload(file, opts \\ []) do
1435 with {:ok, data} <- Upload.store(file, opts) do
1436 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1437
1438 Repo.insert(%Object{data: obj_data})
1439 end
1440 end
1441
1442 @spec get_actor_url(any()) :: binary() | nil
1443 defp get_actor_url(url) when is_binary(url), do: url
1444 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1445
1446 defp get_actor_url(url) when is_list(url) do
1447 url
1448 |> List.first()
1449 |> get_actor_url()
1450 end
1451
1452 defp get_actor_url(_url), do: nil
1453
1454 defp normalize_image(%{"url" => url}) do
1455 %{
1456 "type" => "Image",
1457 "url" => [%{"href" => url}]
1458 }
1459 end
1460
1461 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1462 defp normalize_image(_), do: nil
1463
1464 defp object_to_user_data(data) do
1465 fields =
1466 data
1467 |> Map.get("attachment", [])
1468 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1469 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1470
1471 emojis =
1472 data
1473 |> Map.get("tag", [])
1474 |> Enum.filter(fn
1475 %{"type" => "Emoji"} -> true
1476 _ -> false
1477 end)
1478 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1479 {String.trim(name, ":"), url}
1480 end)
1481
1482 is_locked = data["manuallyApprovesFollowers"] || false
1483 capabilities = data["capabilities"] || %{}
1484 accepts_chat_messages = capabilities["acceptsChatMessages"]
1485 data = Transmogrifier.maybe_fix_user_object(data)
1486 is_discoverable = data["discoverable"] || false
1487 invisible = data["invisible"] || false
1488 actor_type = data["type"] || "Person"
1489
1490 featured_address = data["featured"]
1491 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1492
1493 public_key =
1494 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1495 data["publicKey"]["publicKeyPem"]
1496 else
1497 nil
1498 end
1499
1500 shared_inbox =
1501 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1502 data["endpoints"]["sharedInbox"]
1503 else
1504 nil
1505 end
1506
1507 user_data = %{
1508 ap_id: data["id"],
1509 uri: get_actor_url(data["url"]),
1510 ap_enabled: true,
1511 banner: normalize_image(data["image"]),
1512 fields: fields,
1513 emoji: emojis,
1514 is_locked: is_locked,
1515 is_discoverable: is_discoverable,
1516 invisible: invisible,
1517 avatar: normalize_image(data["icon"]),
1518 name: data["name"],
1519 follower_address: data["followers"],
1520 following_address: data["following"],
1521 featured_address: featured_address,
1522 bio: data["summary"] || "",
1523 actor_type: actor_type,
1524 also_known_as: Map.get(data, "alsoKnownAs", []),
1525 public_key: public_key,
1526 inbox: data["inbox"],
1527 shared_inbox: shared_inbox,
1528 accepts_chat_messages: accepts_chat_messages,
1529 pinned_objects: pinned_objects
1530 }
1531
1532 # nickname can be nil because of virtual actors
1533 if data["preferredUsername"] do
1534 Map.put(
1535 user_data,
1536 :nickname,
1537 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1538 )
1539 else
1540 Map.put(user_data, :nickname, nil)
1541 end
1542 end
1543
1544 def fetch_follow_information_for_user(user) do
1545 with {:ok, following_data} <-
1546 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1547 {:ok, hide_follows} <- collection_private(following_data),
1548 {:ok, followers_data} <-
1549 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1550 {:ok, hide_followers} <- collection_private(followers_data) do
1551 {:ok,
1552 %{
1553 hide_follows: hide_follows,
1554 follower_count: normalize_counter(followers_data["totalItems"]),
1555 following_count: normalize_counter(following_data["totalItems"]),
1556 hide_followers: hide_followers
1557 }}
1558 else
1559 {:error, _} = e -> e
1560 e -> {:error, e}
1561 end
1562 end
1563
1564 defp normalize_counter(counter) when is_integer(counter), do: counter
1565 defp normalize_counter(_), do: 0
1566
1567 def maybe_update_follow_information(user_data) do
1568 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1569 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1570 {_, true} <-
1571 {:collections_available,
1572 !!(user_data[:following_address] && user_data[:follower_address])},
1573 {:ok, info} <-
1574 fetch_follow_information_for_user(user_data) do
1575 info = Map.merge(user_data[:info] || %{}, info)
1576
1577 user_data
1578 |> Map.put(:info, info)
1579 else
1580 {:user_type_check, false} ->
1581 user_data
1582
1583 {:collections_available, false} ->
1584 user_data
1585
1586 {:enabled, false} ->
1587 user_data
1588
1589 e ->
1590 Logger.error(
1591 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1592 )
1593
1594 user_data
1595 end
1596 end
1597
1598 defp collection_private(%{"first" => %{"type" => type}})
1599 when type in ["CollectionPage", "OrderedCollectionPage"],
1600 do: {:ok, false}
1601
1602 defp collection_private(%{"first" => first}) do
1603 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1604 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1605 {:ok, false}
1606 else
1607 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1608 {:error, _} = e -> e
1609 e -> {:error, e}
1610 end
1611 end
1612
1613 defp collection_private(_data), do: {:ok, true}
1614
1615 def user_data_from_user_object(data) do
1616 with {:ok, data} <- MRF.filter(data) do
1617 {:ok, object_to_user_data(data)}
1618 else
1619 e -> {:error, e}
1620 end
1621 end
1622
1623 def fetch_and_prepare_user_from_ap_id(ap_id) do
1624 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1625 {:ok, data} <- user_data_from_user_object(data) do
1626 {:ok, maybe_update_follow_information(data)}
1627 else
1628 # If this has been deleted, only log a debug and not an error
1629 {:error, "Object has been deleted" = e} ->
1630 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1631 {:error, e}
1632
1633 {:error, {:reject, reason} = e} ->
1634 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1635 {:error, e}
1636
1637 {:error, e} ->
1638 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1639 {:error, e}
1640 end
1641 end
1642
1643 def maybe_handle_clashing_nickname(data) do
1644 with nickname when is_binary(nickname) <- data[:nickname],
1645 %User{} = old_user <- User.get_by_nickname(nickname),
1646 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1647 Logger.info(
1648 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1649 )
1650
1651 old_user
1652 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1653 |> User.update_and_set_cache()
1654 else
1655 {:ap_id_comparison, true} ->
1656 Logger.info(
1657 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1658 )
1659
1660 _ ->
1661 nil
1662 end
1663 end
1664
1665 def pin_data_from_featured_collection(%{
1666 "type" => "OrderedCollection",
1667 "first" => first
1668 }) do
1669 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1670 page
1671 |> Map.get("orderedItems")
1672 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1673 else
1674 e ->
1675 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1676 {:ok, %{}}
1677 end
1678 end
1679
1680 def pin_data_from_featured_collection(%{
1681 "type" => "Collection",
1682 "first" => first
1683 }) do
1684 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1685 page
1686 |> Map.get("items")
1687 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1688 else
1689 e ->
1690 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1691 {:ok, %{}}
1692 end
1693 end
1694
1695 def pin_data_from_featured_collection(%{
1696 "type" => type,
1697 "orderedItems" => objects
1698 })
1699 when type in ["OrderedCollection", "Collection"] do
1700 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1701 end
1702
1703 def fetch_and_prepare_featured_from_ap_id(nil) do
1704 {:ok, %{}}
1705 end
1706
1707 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1708 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1709 {:ok, pin_data_from_featured_collection(data)}
1710 else
1711 e ->
1712 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1713 {:ok, %{}}
1714 end
1715 end
1716
1717 def pinned_fetch_task(nil), do: nil
1718
1719 def pinned_fetch_task(%{pinned_objects: pins}) do
1720 if Enum.all?(pins, fn {ap_id, _} ->
1721 Object.get_cached_by_ap_id(ap_id) ||
1722 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1723 end) do
1724 :ok
1725 else
1726 :error
1727 end
1728 end
1729
1730 def make_user_from_ap_id(ap_id) do
1731 user = User.get_cached_by_ap_id(ap_id)
1732
1733 if user && !User.ap_enabled?(user) do
1734 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1735 else
1736 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1737 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1738
1739 if user do
1740 user
1741 |> User.remote_user_changeset(data)
1742 |> User.update_and_set_cache()
1743 else
1744 maybe_handle_clashing_nickname(data)
1745
1746 data
1747 |> User.remote_user_changeset()
1748 |> Repo.insert()
1749 |> User.set_cache()
1750 end
1751 end
1752 end
1753 end
1754
1755 def make_user_from_nickname(nickname) do
1756 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1757 make_user_from_ap_id(ap_id)
1758 else
1759 _e -> {:error, "No AP id in WebFinger"}
1760 end
1761 end
1762
1763 # filter out broken threads
1764 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1765 entire_thread_visible_for_user?(activity, user)
1766 end
1767
1768 # do post-processing on a specific activity
1769 def contain_activity(%Activity{} = activity, %User{} = user) do
1770 contain_broken_threads(activity, user)
1771 end
1772
1773 def fetch_direct_messages_query do
1774 Activity
1775 |> restrict_type(%{type: "Create"})
1776 |> restrict_visibility(%{visibility: "direct"})
1777 |> order_by([activity], asc: activity.id)
1778 end
1779 end