Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into remove/mastofe
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
53 {recipients, to, cc}
54 end
55
56 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
57 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
58
59 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
60 case User.get_cached_by_ap_id(actor) do
61 %User{is_active: true} -> true
62 _ -> false
63 end
64 end
65
66 defp check_actor_can_insert(_), do: true
67
68 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
69 limit = Config.get([:instance, :remote_limit])
70 String.length(content) <= limit
71 end
72
73 defp check_remote_limit(_), do: true
74
75 def increase_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
77 end
78
79 def decrease_note_count_if_public(actor, object) do
80 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
81 end
82
83 defp increase_replies_count_if_reply(%{
84 "object" => %{"inReplyTo" => reply_ap_id} = object,
85 "type" => "Create"
86 }) do
87 if is_public?(object) do
88 Object.increase_replies_count(reply_ap_id)
89 end
90 end
91
92 defp increase_replies_count_if_reply(_create_data), do: :noop
93
94 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
95 @impl true
96 def persist(%{"type" => type} = object, meta) when type in @object_types do
97 with {:ok, object} <- Object.create(object) do
98 {:ok, object, meta}
99 end
100 end
101
102 @impl true
103 def persist(object, meta) do
104 with local <- Keyword.fetch!(meta, :local),
105 {recipients, _, _} <- get_recipients(object),
106 {:ok, activity} <-
107 Repo.insert(%Activity{
108 data: object,
109 local: local,
110 recipients: recipients,
111 actor: object["actor"]
112 }),
113 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
114 {:ok, _} <- maybe_create_activity_expiration(activity) do
115 {:ok, activity, meta}
116 end
117 end
118
119 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
120 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
121 with nil <- Activity.normalize(map),
122 map <- lazy_put_activity_defaults(map, fake),
123 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
124 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
125 {:ok, map} <- MRF.filter(map),
126 {recipients, _, _} = get_recipients(map),
127 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
128 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
129 {:ok, map, object} <- insert_full_object(map),
130 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
131 # Splice in the child object if we have one.
132 activity = Maps.put_if_present(activity, :object, object)
133
134 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
135 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
136 end)
137
138 {:ok, activity}
139 else
140 %Activity{} = activity ->
141 {:ok, activity}
142
143 {:actor_check, _} ->
144 {:error, false}
145
146 {:containment, _} = error ->
147 error
148
149 {:error, _} = error ->
150 error
151
152 {:fake, true, map, recipients} ->
153 activity = %Activity{
154 data: map,
155 local: local,
156 actor: map["actor"],
157 recipients: recipients,
158 id: "pleroma:fakeid"
159 }
160
161 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
162 {:ok, activity}
163
164 {:remote_limit_pass, _} ->
165 {:error, :remote_limit}
166
167 {:reject, _} = e ->
168 {:error, e}
169 end
170 end
171
172 defp insert_activity_with_expiration(data, local, recipients) do
173 struct = %Activity{
174 data: data,
175 local: local,
176 actor: data["actor"],
177 recipients: recipients
178 }
179
180 with {:ok, activity} <- Repo.insert(struct) do
181 maybe_create_activity_expiration(activity)
182 end
183 end
184
185 def notify_and_stream(activity) do
186 Notification.create_notifications(activity)
187
188 conversation = create_or_bump_conversation(activity, activity.actor)
189 participations = get_participations(conversation)
190 stream_out(activity)
191 stream_out_participations(participations)
192 end
193
194 defp maybe_create_activity_expiration(
195 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
196 ) do
197 with {:ok, _job} <-
198 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
199 activity_id: activity.id,
200 expires_at: expires_at
201 }) do
202 {:ok, activity}
203 end
204 end
205
206 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
207
208 defp create_or_bump_conversation(activity, actor) do
209 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
210 %User{} = user <- User.get_cached_by_ap_id(actor) do
211 Participation.mark_as_read(user, conversation)
212 {:ok, conversation}
213 end
214 end
215
216 defp get_participations({:ok, conversation}) do
217 conversation
218 |> Repo.preload(:participations, force: true)
219 |> Map.get(:participations)
220 end
221
222 defp get_participations(_), do: []
223
224 def stream_out_participations(participations) do
225 participations =
226 participations
227 |> Repo.preload(:user)
228
229 Streamer.stream("participation", participations)
230 end
231
232 @impl true
233 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
234 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
235 conversation = Repo.preload(conversation, :participations)
236
237 last_activity_id =
238 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
239 user: user,
240 blocking_user: user
241 })
242
243 if last_activity_id do
244 stream_out_participations(conversation.participations)
245 end
246 end
247 end
248
249 @impl true
250 def stream_out_participations(_, _), do: :noop
251
252 @impl true
253 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
254 when data_type in ["Create", "Announce", "Delete"] do
255 activity
256 |> Topics.get_activity_topics()
257 |> Streamer.stream(activity)
258 end
259
260 @impl true
261 def stream_out(_activity) do
262 :noop
263 end
264
265 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
266 def create(params, fake \\ false) do
267 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
268 result
269 end
270 end
271
272 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
273 additional = params[:additional] || %{}
274 # only accept false as false value
275 local = !(params[:local] == false)
276 published = params[:published]
277 quick_insert? = Config.get([:env]) == :benchmark
278
279 create_data =
280 make_create_data(
281 %{to: to, actor: actor, published: published, context: context, object: object},
282 additional
283 )
284
285 with {:ok, activity} <- insert(create_data, local, fake),
286 {:fake, false, activity} <- {:fake, fake, activity},
287 _ <- increase_replies_count_if_reply(create_data),
288 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
289 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
290 _ <- notify_and_stream(activity),
291 :ok <- maybe_federate(activity) do
292 {:ok, activity}
293 else
294 {:quick_insert, true, activity} ->
295 {:ok, activity}
296
297 {:fake, true, activity} ->
298 {:ok, activity}
299
300 {:error, message} ->
301 Repo.rollback(message)
302 end
303 end
304
305 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
306 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
307 additional = params[:additional] || %{}
308 # only accept false as false value
309 local = !(params[:local] == false)
310 published = params[:published]
311
312 listen_data =
313 make_listen_data(
314 %{to: to, actor: actor, published: published, context: context, object: object},
315 additional
316 )
317
318 with {:ok, activity} <- insert(listen_data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
326 {:ok, Activity.t()} | nil | {:error, any()}
327 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 result
331 end
332 end
333
334 defp do_unfollow(follower, followed, activity_id, local) do
335 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
336 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
337 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
338 {:ok, activity} <- insert(unfollow_data, local),
339 _ <- notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
341 {:ok, activity}
342 else
343 nil -> nil
344 {:error, error} -> Repo.rollback(error)
345 end
346 end
347
348 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
349 def flag(params) do
350 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
351 result
352 end
353 end
354
355 defp do_flag(
356 %{
357 actor: actor,
358 context: _context,
359 account: account,
360 statuses: statuses,
361 content: content
362 } = params
363 ) do
364 # only accept false as false value
365 local = !(params[:local] == false)
366 forward = !(params[:forward] == false)
367
368 additional = params[:additional] || %{}
369
370 additional =
371 if forward do
372 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
373 else
374 Map.merge(additional, %{"to" => [], "cc" => []})
375 end
376
377 with flag_data <- make_flag_data(params, additional),
378 {:ok, activity} <- insert(flag_data, local),
379 {:ok, stripped_activity} <- strip_report_status_data(activity),
380 _ <- notify_and_stream(activity),
381 :ok <-
382 maybe_federate(stripped_activity) do
383 User.all_superusers()
384 |> Enum.filter(fn user -> user.ap_id != actor end)
385 |> Enum.filter(fn user -> not is_nil(user.email) end)
386 |> Enum.each(fn superuser ->
387 superuser
388 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
389 |> Pleroma.Emails.Mailer.deliver_async()
390 end)
391
392 {:ok, activity}
393 else
394 {:error, error} -> Repo.rollback(error)
395 end
396 end
397
398 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
399 def move(%User{} = origin, %User{} = target, local \\ true) do
400 params = %{
401 "type" => "Move",
402 "actor" => origin.ap_id,
403 "object" => origin.ap_id,
404 "target" => target.ap_id
405 }
406
407 with true <- origin.ap_id in target.also_known_as,
408 {:ok, activity} <- insert(params, local),
409 _ <- notify_and_stream(activity) do
410 maybe_federate(activity)
411
412 BackgroundWorker.enqueue("move_following", %{
413 "origin_id" => origin.id,
414 "target_id" => target.id
415 })
416
417 {:ok, activity}
418 else
419 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
420 err -> err
421 end
422 end
423
424 def fetch_activities_for_context_query(context, opts) do
425 public = [Constants.as_public()]
426
427 recipients =
428 if opts[:user],
429 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
430 else: public
431
432 from(activity in Activity)
433 |> maybe_preload_objects(opts)
434 |> maybe_preload_bookmarks(opts)
435 |> maybe_set_thread_muted_field(opts)
436 |> restrict_blocked(opts)
437 |> restrict_recipients(recipients, opts[:user])
438 |> restrict_filtered(opts)
439 |> where(
440 [activity],
441 fragment(
442 "?->>'type' = ? and ?->>'context' = ?",
443 activity.data,
444 "Create",
445 activity.data,
446 ^context
447 )
448 )
449 |> exclude_poll_votes(opts)
450 |> exclude_id(opts)
451 |> order_by([activity], desc: activity.id)
452 end
453
454 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
455 def fetch_activities_for_context(context, opts \\ %{}) do
456 context
457 |> fetch_activities_for_context_query(opts)
458 |> Repo.all()
459 end
460
461 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
462 FlakeId.Ecto.CompatType.t() | nil
463 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
464 context
465 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
466 |> restrict_visibility(%{visibility: "direct"})
467 |> limit(1)
468 |> select([a], a.id)
469 |> Repo.one()
470 end
471
472 defp fetch_paginated_optimized(query, opts, pagination) do
473 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
474 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
475 opts = Map.put(opts, :skip_extra_order, true)
476
477 Pagination.fetch_paginated(query, opts, pagination)
478 end
479
480 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
481 list_memberships = Pleroma.List.memberships(opts[:user])
482
483 fetch_activities_query(recipients ++ list_memberships, opts)
484 |> fetch_paginated_optimized(opts, pagination)
485 |> Enum.reverse()
486 |> maybe_update_cc(list_memberships, opts[:user])
487 end
488
489 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
490 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
491 opts = Map.delete(opts, :user)
492
493 [Constants.as_public()]
494 |> fetch_activities_query(opts)
495 |> restrict_unlisted(opts)
496 |> fetch_paginated_optimized(opts, pagination)
497 end
498
499 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
500 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
501 opts
502 |> Map.put(:restrict_unlisted, true)
503 |> fetch_public_or_unlisted_activities(pagination)
504 end
505
506 @valid_visibilities ~w[direct unlisted public private]
507
508 defp restrict_visibility(query, %{visibility: visibility})
509 when is_list(visibility) do
510 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
511 from(
512 a in query,
513 where:
514 fragment(
515 "activity_visibility(?, ?, ?) = ANY (?)",
516 a.actor,
517 a.recipients,
518 a.data,
519 ^visibility
520 )
521 )
522 else
523 Logger.error("Could not restrict visibility to #{visibility}")
524 end
525 end
526
527 defp restrict_visibility(query, %{visibility: visibility})
528 when visibility in @valid_visibilities do
529 from(
530 a in query,
531 where:
532 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
533 )
534 end
535
536 defp restrict_visibility(_query, %{visibility: visibility})
537 when visibility not in @valid_visibilities do
538 Logger.error("Could not restrict visibility to #{visibility}")
539 end
540
541 defp restrict_visibility(query, _visibility), do: query
542
543 defp exclude_visibility(query, %{exclude_visibilities: visibility})
544 when is_list(visibility) do
545 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
546 from(
547 a in query,
548 where:
549 not fragment(
550 "activity_visibility(?, ?, ?) = ANY (?)",
551 a.actor,
552 a.recipients,
553 a.data,
554 ^visibility
555 )
556 )
557 else
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561 end
562
563 defp exclude_visibility(query, %{exclude_visibilities: visibility})
564 when visibility in @valid_visibilities do
565 from(
566 a in query,
567 where:
568 not fragment(
569 "activity_visibility(?, ?, ?) = ?",
570 a.actor,
571 a.recipients,
572 a.data,
573 ^visibility
574 )
575 )
576 end
577
578 defp exclude_visibility(query, %{exclude_visibilities: visibility})
579 when visibility not in [nil | @valid_visibilities] do
580 Logger.error("Could not exclude visibility to #{visibility}")
581 query
582 end
583
584 defp exclude_visibility(query, _visibility), do: query
585
586 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
587 do: query
588
589 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
590 do: query
591
592 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
593 from(
594 a in query,
595 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
596 )
597 end
598
599 defp restrict_thread_visibility(query, _, _), do: query
600
601 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
602 params =
603 params
604 |> Map.put(:user, reading_user)
605 |> Map.put(:actor_id, user.ap_id)
606
607 %{
608 godmode: params[:godmode],
609 reading_user: reading_user
610 }
611 |> user_activities_recipients()
612 |> fetch_activities(params)
613 |> Enum.reverse()
614 end
615
616 def fetch_user_activities(user, reading_user, params \\ %{})
617
618 def fetch_user_activities(user, reading_user, %{total: true} = params) do
619 result = fetch_activities_for_user(user, reading_user, params)
620
621 Keyword.put(result, :items, Enum.reverse(result[:items]))
622 end
623
624 def fetch_user_activities(user, reading_user, params) do
625 user
626 |> fetch_activities_for_user(reading_user, params)
627 |> Enum.reverse()
628 end
629
630 defp fetch_activities_for_user(user, reading_user, params) do
631 params =
632 params
633 |> Map.put(:type, ["Create", "Announce"])
634 |> Map.put(:user, reading_user)
635 |> Map.put(:actor_id, user.ap_id)
636 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
637
638 params =
639 if User.blocks?(reading_user, user) do
640 params
641 else
642 params
643 |> Map.put(:blocking_user, reading_user)
644 |> Map.put(:muting_user, reading_user)
645 end
646
647 pagination_type = Map.get(params, :pagination_type) || :keyset
648
649 %{
650 godmode: params[:godmode],
651 reading_user: reading_user
652 }
653 |> user_activities_recipients()
654 |> fetch_activities(params, pagination_type)
655 end
656
657 def fetch_statuses(reading_user, %{total: true} = params) do
658 result = fetch_activities_for_reading_user(reading_user, params)
659 Keyword.put(result, :items, Enum.reverse(result[:items]))
660 end
661
662 def fetch_statuses(reading_user, params) do
663 reading_user
664 |> fetch_activities_for_reading_user(params)
665 |> Enum.reverse()
666 end
667
668 defp fetch_activities_for_reading_user(reading_user, params) do
669 params = Map.put(params, :type, ["Create", "Announce"])
670
671 %{
672 godmode: params[:godmode],
673 reading_user: reading_user
674 }
675 |> user_activities_recipients()
676 |> fetch_activities(params, :offset)
677 end
678
679 defp user_activities_recipients(%{godmode: true}), do: []
680
681 defp user_activities_recipients(%{reading_user: reading_user}) do
682 if reading_user do
683 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
684 else
685 [Constants.as_public()]
686 end
687 end
688
689 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
690 raise "Can't use the child object without preloading!"
691 end
692
693 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
694 from(
695 [activity, object] in query,
696 where:
697 fragment(
698 "?->>'type' != ? or ?->>'actor' != ?",
699 activity.data,
700 "Announce",
701 object.data,
702 ^actor
703 )
704 )
705 end
706
707 defp restrict_announce_object_actor(query, _), do: query
708
709 defp restrict_since(query, %{since_id: ""}), do: query
710
711 defp restrict_since(query, %{since_id: since_id}) do
712 from(activity in query, where: activity.id > ^since_id)
713 end
714
715 defp restrict_since(query, _), do: query
716
717 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
718 raise_on_missing_preload()
719 end
720
721 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
722 from(
723 [_activity, object] in query,
724 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
725 )
726 end
727
728 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
729 restrict_embedded_tag_any(query, %{tag: tag})
730 end
731
732 defp restrict_embedded_tag_all(query, _), do: query
733
734 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
735 raise_on_missing_preload()
736 end
737
738 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
739 from(
740 [_activity, object] in query,
741 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
742 )
743 end
744
745 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
746 restrict_embedded_tag_any(query, %{tag: [tag]})
747 end
748
749 defp restrict_embedded_tag_any(query, _), do: query
750
751 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
752 raise_on_missing_preload()
753 end
754
755 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
756 from(
757 [_activity, object] in query,
758 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
759 )
760 end
761
762 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
763 when is_binary(tag_reject) do
764 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
765 end
766
767 defp restrict_embedded_tag_reject_any(query, _), do: query
768
769 defp object_ids_query_for_tags(tags) do
770 from(hto in "hashtags_objects")
771 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
772 |> where([hto, ht], ht.name in ^tags)
773 |> select([hto], hto.object_id)
774 |> distinct([hto], true)
775 end
776
777 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
778 raise_on_missing_preload()
779 end
780
781 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
782 restrict_hashtag_any(query, %{tag: single_tag})
783 end
784
785 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
786 from(
787 [_activity, object] in query,
788 where:
789 fragment(
790 """
791 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
792 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
793 AND hashtags_objects.object_id = ?) @> ?
794 """,
795 ^tags,
796 object.id,
797 ^tags
798 )
799 )
800 end
801
802 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
803 restrict_hashtag_all(query, %{tag_all: [tag]})
804 end
805
806 defp restrict_hashtag_all(query, _), do: query
807
808 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
809 raise_on_missing_preload()
810 end
811
812 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
813 hashtag_ids =
814 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
815 |> Repo.all()
816
817 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
818 from(
819 [_activity, object] in query,
820 join: hto in "hashtags_objects",
821 on: hto.object_id == object.id,
822 where: hto.hashtag_id in ^hashtag_ids,
823 distinct: [desc: object.id],
824 order_by: [desc: object.id]
825 )
826 end
827
828 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
829 restrict_hashtag_any(query, %{tag: [tag]})
830 end
831
832 defp restrict_hashtag_any(query, _), do: query
833
834 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
835 raise_on_missing_preload()
836 end
837
838 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
839 from(
840 [_activity, object] in query,
841 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
842 )
843 end
844
845 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
846 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
847 end
848
849 defp restrict_hashtag_reject_any(query, _), do: query
850
851 defp raise_on_missing_preload do
852 raise "Can't use the child object without preloading!"
853 end
854
855 defp restrict_recipients(query, [], _user), do: query
856
857 defp restrict_recipients(query, recipients, nil) do
858 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
859 end
860
861 defp restrict_recipients(query, recipients, user) do
862 from(
863 activity in query,
864 where: fragment("? && ?", ^recipients, activity.recipients),
865 or_where: activity.actor == ^user.ap_id
866 )
867 end
868
869 defp restrict_local(query, %{local_only: true}) do
870 from(activity in query, where: activity.local == true)
871 end
872
873 defp restrict_local(query, _), do: query
874
875 defp restrict_remote(query, %{remote: true}) do
876 from(activity in query, where: activity.local == false)
877 end
878
879 defp restrict_remote(query, _), do: query
880
881 defp restrict_actor(query, %{actor_id: actor_id}) do
882 from(activity in query, where: activity.actor == ^actor_id)
883 end
884
885 defp restrict_actor(query, _), do: query
886
887 defp restrict_type(query, %{type: type}) when is_binary(type) do
888 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
889 end
890
891 defp restrict_type(query, %{type: type}) do
892 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
893 end
894
895 defp restrict_type(query, _), do: query
896
897 defp restrict_state(query, %{state: state}) do
898 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
899 end
900
901 defp restrict_state(query, _), do: query
902
903 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
904 from(
905 [_activity, object] in query,
906 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
907 )
908 end
909
910 defp restrict_favorited_by(query, _), do: query
911
912 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
913 raise "Can't use the child object without preloading!"
914 end
915
916 defp restrict_media(query, %{only_media: true}) do
917 from(
918 [activity, object] in query,
919 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
920 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
921 )
922 end
923
924 defp restrict_media(query, _), do: query
925
926 defp restrict_replies(query, %{exclude_replies: true}) do
927 from(
928 [_activity, object] in query,
929 where: fragment("?->>'inReplyTo' is null", object.data)
930 )
931 end
932
933 defp restrict_replies(query, %{
934 reply_filtering_user: %User{} = user,
935 reply_visibility: "self"
936 }) do
937 from(
938 [activity, object] in query,
939 where:
940 fragment(
941 "?->>'inReplyTo' is null OR ? = ANY(?)",
942 object.data,
943 ^user.ap_id,
944 activity.recipients
945 )
946 )
947 end
948
949 defp restrict_replies(query, %{
950 reply_filtering_user: %User{} = user,
951 reply_visibility: "following"
952 }) do
953 from(
954 [activity, object] in query,
955 where:
956 fragment(
957 """
958 ?->>'type' != 'Create' -- This isn't a Create
959 OR ?->>'inReplyTo' is null -- this isn't a reply
960 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
961 -- unless they are the author (because authors
962 -- are also part of the recipients). This leads
963 -- to a bug that self-replies by friends won't
964 -- show up.
965 OR ? = ? -- The actor is us
966 """,
967 activity.data,
968 object.data,
969 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
970 activity.recipients,
971 activity.actor,
972 activity.actor,
973 ^user.ap_id
974 )
975 )
976 end
977
978 defp restrict_replies(query, _), do: query
979
980 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
981 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
982 end
983
984 defp restrict_reblogs(query, _), do: query
985
986 defp restrict_muted(query, %{with_muted: true}), do: query
987
988 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
989 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
990
991 query =
992 from([activity] in query,
993 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
994 where:
995 fragment(
996 "not (?->'to' \\?| ?) or ? = ?",
997 activity.data,
998 ^mutes,
999 activity.actor,
1000 ^user.ap_id
1001 )
1002 )
1003
1004 unless opts[:skip_preload] do
1005 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1006 else
1007 query
1008 end
1009 end
1010
1011 defp restrict_muted(query, _), do: query
1012
1013 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1014 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1015 domain_blocks = user.domain_blocks || []
1016
1017 following_ap_ids = User.get_friends_ap_ids(user)
1018
1019 query =
1020 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1021
1022 from(
1023 [activity, object: o] in query,
1024 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1025 where:
1026 fragment(
1027 "((not (? && ?)) or ? = ?)",
1028 activity.recipients,
1029 ^blocked_ap_ids,
1030 activity.actor,
1031 ^user.ap_id
1032 ),
1033 where:
1034 fragment(
1035 "recipients_contain_blocked_domains(?, ?) = false",
1036 activity.recipients,
1037 ^domain_blocks
1038 ),
1039 where:
1040 fragment(
1041 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1042 activity.data,
1043 activity.data,
1044 ^blocked_ap_ids
1045 ),
1046 where:
1047 fragment(
1048 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1049 activity.actor,
1050 ^domain_blocks,
1051 activity.actor,
1052 ^following_ap_ids
1053 ),
1054 where:
1055 fragment(
1056 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1057 o.data,
1058 ^domain_blocks,
1059 o.data,
1060 ^following_ap_ids
1061 )
1062 )
1063 end
1064
1065 defp restrict_blocked(query, _), do: query
1066
1067 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1068 from(
1069 activity in query,
1070 where:
1071 fragment(
1072 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1073 activity.data,
1074 ^[Constants.as_public()]
1075 )
1076 )
1077 end
1078
1079 defp restrict_unlisted(query, _), do: query
1080
1081 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1082 from(
1083 [activity, object: o] in query,
1084 where:
1085 fragment(
1086 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1087 activity.data,
1088 activity.data,
1089 activity.data,
1090 ^ids
1091 )
1092 )
1093 end
1094
1095 defp restrict_pinned(query, _), do: query
1096
1097 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1098 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1099
1100 from(
1101 activity in query,
1102 where:
1103 fragment(
1104 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1105 activity.data,
1106 activity.actor,
1107 ^muted_reblogs
1108 )
1109 )
1110 end
1111
1112 defp restrict_muted_reblogs(query, _), do: query
1113
1114 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1115 from(
1116 activity in query,
1117 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1118 )
1119 end
1120
1121 defp restrict_instance(query, _), do: query
1122
1123 defp restrict_filtered(query, %{user: %User{} = user}) do
1124 case Filter.compose_regex(user) do
1125 nil ->
1126 query
1127
1128 regex ->
1129 from([activity, object] in query,
1130 where:
1131 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1132 activity.actor == ^user.ap_id
1133 )
1134 end
1135 end
1136
1137 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1138 restrict_filtered(query, %{user: user})
1139 end
1140
1141 defp restrict_filtered(query, _), do: query
1142
1143 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1144
1145 defp exclude_poll_votes(query, _) do
1146 if has_named_binding?(query, :object) do
1147 from([activity, object: o] in query,
1148 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1149 )
1150 else
1151 query
1152 end
1153 end
1154
1155 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1156
1157 defp exclude_chat_messages(query, _) do
1158 if has_named_binding?(query, :object) do
1159 from([activity, object: o] in query,
1160 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1161 )
1162 else
1163 query
1164 end
1165 end
1166
1167 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1168
1169 defp exclude_invisible_actors(query, _opts) do
1170 invisible_ap_ids =
1171 User.Query.build(%{invisible: true, select: [:ap_id]})
1172 |> Repo.all()
1173 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1174
1175 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1176 end
1177
1178 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1179 from(activity in query, where: activity.id != ^id)
1180 end
1181
1182 defp exclude_id(query, _), do: query
1183
1184 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1185
1186 defp maybe_preload_objects(query, _) do
1187 query
1188 |> Activity.with_preloaded_object()
1189 end
1190
1191 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1192
1193 defp maybe_preload_bookmarks(query, opts) do
1194 query
1195 |> Activity.with_preloaded_bookmark(opts[:user])
1196 end
1197
1198 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1199 query
1200 |> Activity.with_preloaded_report_notes()
1201 end
1202
1203 defp maybe_preload_report_notes(query, _), do: query
1204
1205 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1206
1207 defp maybe_set_thread_muted_field(query, opts) do
1208 query
1209 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1210 end
1211
1212 defp maybe_order(query, %{order: :desc}) do
1213 query
1214 |> order_by(desc: :id)
1215 end
1216
1217 defp maybe_order(query, %{order: :asc}) do
1218 query
1219 |> order_by(asc: :id)
1220 end
1221
1222 defp maybe_order(query, _), do: query
1223
1224 defp normalize_fetch_activities_query_opts(opts) do
1225 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1226 case opts[key] do
1227 value when is_bitstring(value) ->
1228 Map.put(opts, key, Hashtag.normalize_name(value))
1229
1230 value when is_list(value) ->
1231 normalized_value =
1232 value
1233 |> Enum.map(&Hashtag.normalize_name/1)
1234 |> Enum.uniq()
1235
1236 Map.put(opts, key, normalized_value)
1237
1238 _ ->
1239 opts
1240 end
1241 end)
1242 end
1243
1244 defp fetch_activities_query_ap_ids_ops(opts) do
1245 source_user = opts[:muting_user]
1246 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1247
1248 ap_id_relationships =
1249 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1250 [:block | ap_id_relationships]
1251 else
1252 ap_id_relationships
1253 end
1254
1255 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1256
1257 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1258 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1259
1260 restrict_muted_reblogs_opts =
1261 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1262
1263 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1264 end
1265
1266 def fetch_activities_query(recipients, opts \\ %{}) do
1267 opts = normalize_fetch_activities_query_opts(opts)
1268
1269 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1270 fetch_activities_query_ap_ids_ops(opts)
1271
1272 config = %{
1273 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1274 }
1275
1276 query =
1277 Activity
1278 |> maybe_preload_objects(opts)
1279 |> maybe_preload_bookmarks(opts)
1280 |> maybe_preload_report_notes(opts)
1281 |> maybe_set_thread_muted_field(opts)
1282 |> maybe_order(opts)
1283 |> restrict_recipients(recipients, opts[:user])
1284 |> restrict_replies(opts)
1285 |> restrict_since(opts)
1286 |> restrict_local(opts)
1287 |> restrict_remote(opts)
1288 |> restrict_actor(opts)
1289 |> restrict_type(opts)
1290 |> restrict_state(opts)
1291 |> restrict_favorited_by(opts)
1292 |> restrict_blocked(restrict_blocked_opts)
1293 |> restrict_muted(restrict_muted_opts)
1294 |> restrict_filtered(opts)
1295 |> restrict_media(opts)
1296 |> restrict_visibility(opts)
1297 |> restrict_thread_visibility(opts, config)
1298 |> restrict_reblogs(opts)
1299 |> restrict_pinned(opts)
1300 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1301 |> restrict_instance(opts)
1302 |> restrict_announce_object_actor(opts)
1303 |> restrict_filtered(opts)
1304 |> Activity.restrict_deactivated_users()
1305 |> exclude_poll_votes(opts)
1306 |> exclude_chat_messages(opts)
1307 |> exclude_invisible_actors(opts)
1308 |> exclude_visibility(opts)
1309
1310 if Config.feature_enabled?(:improved_hashtag_timeline) do
1311 query
1312 |> restrict_hashtag_any(opts)
1313 |> restrict_hashtag_all(opts)
1314 |> restrict_hashtag_reject_any(opts)
1315 else
1316 query
1317 |> restrict_embedded_tag_any(opts)
1318 |> restrict_embedded_tag_all(opts)
1319 |> restrict_embedded_tag_reject_any(opts)
1320 end
1321 end
1322
1323 @doc """
1324 Fetch favorites activities of user with order by sort adds to favorites
1325 """
1326 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1327 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1328 user.ap_id
1329 |> Activity.Queries.by_actor()
1330 |> Activity.Queries.by_type("Like")
1331 |> Activity.with_joined_object()
1332 |> Object.with_joined_activity()
1333 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1334 |> order_by([like, _, _], desc_nulls_last: like.id)
1335 |> Pagination.fetch_paginated(
1336 Map.merge(params, %{skip_order: true}),
1337 pagination
1338 )
1339 end
1340
1341 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1342 Enum.map(activities, fn
1343 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1344 if Enum.any?(bcc, &(&1 in list_memberships)) do
1345 update_in(activity.data["cc"], &[user_ap_id | &1])
1346 else
1347 activity
1348 end
1349
1350 activity ->
1351 activity
1352 end)
1353 end
1354
1355 defp maybe_update_cc(activities, _, _), do: activities
1356
1357 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1358 from(activity in query,
1359 where:
1360 fragment("? && ?", activity.recipients, ^recipients) or
1361 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1362 ^Constants.as_public() in activity.recipients)
1363 )
1364 end
1365
1366 def fetch_activities_bounded(
1367 recipients,
1368 recipients_with_public,
1369 opts \\ %{},
1370 pagination \\ :keyset
1371 ) do
1372 fetch_activities_query([], opts)
1373 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1374 |> Pagination.fetch_paginated(opts, pagination)
1375 |> Enum.reverse()
1376 end
1377
1378 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1379 def upload(file, opts \\ []) do
1380 with {:ok, data} <- Upload.store(file, opts) do
1381 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1382
1383 Repo.insert(%Object{data: obj_data})
1384 end
1385 end
1386
1387 @spec get_actor_url(any()) :: binary() | nil
1388 defp get_actor_url(url) when is_binary(url), do: url
1389 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1390
1391 defp get_actor_url(url) when is_list(url) do
1392 url
1393 |> List.first()
1394 |> get_actor_url()
1395 end
1396
1397 defp get_actor_url(_url), do: nil
1398
1399 defp normalize_image(%{"url" => url}) do
1400 %{
1401 "type" => "Image",
1402 "url" => [%{"href" => url}]
1403 }
1404 end
1405
1406 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1407 defp normalize_image(_), do: nil
1408
1409 defp object_to_user_data(data) do
1410 fields =
1411 data
1412 |> Map.get("attachment", [])
1413 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1414 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1415
1416 emojis =
1417 data
1418 |> Map.get("tag", [])
1419 |> Enum.filter(fn
1420 %{"type" => "Emoji"} -> true
1421 _ -> false
1422 end)
1423 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1424 {String.trim(name, ":"), url}
1425 end)
1426
1427 is_locked = data["manuallyApprovesFollowers"] || false
1428 capabilities = data["capabilities"] || %{}
1429 accepts_chat_messages = capabilities["acceptsChatMessages"]
1430 data = Transmogrifier.maybe_fix_user_object(data)
1431 is_discoverable = data["discoverable"] || false
1432 invisible = data["invisible"] || false
1433 actor_type = data["type"] || "Person"
1434
1435 featured_address = data["featured"]
1436 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1437
1438 public_key =
1439 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1440 data["publicKey"]["publicKeyPem"]
1441 else
1442 nil
1443 end
1444
1445 shared_inbox =
1446 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1447 data["endpoints"]["sharedInbox"]
1448 else
1449 nil
1450 end
1451
1452 user_data = %{
1453 ap_id: data["id"],
1454 uri: get_actor_url(data["url"]),
1455 ap_enabled: true,
1456 banner: normalize_image(data["image"]),
1457 fields: fields,
1458 emoji: emojis,
1459 is_locked: is_locked,
1460 is_discoverable: is_discoverable,
1461 invisible: invisible,
1462 avatar: normalize_image(data["icon"]),
1463 name: data["name"],
1464 follower_address: data["followers"],
1465 following_address: data["following"],
1466 featured_address: featured_address,
1467 bio: data["summary"] || "",
1468 actor_type: actor_type,
1469 also_known_as: Map.get(data, "alsoKnownAs", []),
1470 public_key: public_key,
1471 inbox: data["inbox"],
1472 shared_inbox: shared_inbox,
1473 accepts_chat_messages: accepts_chat_messages,
1474 pinned_objects: pinned_objects
1475 }
1476
1477 # nickname can be nil because of virtual actors
1478 if data["preferredUsername"] do
1479 Map.put(
1480 user_data,
1481 :nickname,
1482 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1483 )
1484 else
1485 Map.put(user_data, :nickname, nil)
1486 end
1487 end
1488
1489 def fetch_follow_information_for_user(user) do
1490 with {:ok, following_data} <-
1491 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1492 {:ok, hide_follows} <- collection_private(following_data),
1493 {:ok, followers_data} <-
1494 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1495 {:ok, hide_followers} <- collection_private(followers_data) do
1496 {:ok,
1497 %{
1498 hide_follows: hide_follows,
1499 follower_count: normalize_counter(followers_data["totalItems"]),
1500 following_count: normalize_counter(following_data["totalItems"]),
1501 hide_followers: hide_followers
1502 }}
1503 else
1504 {:error, _} = e -> e
1505 e -> {:error, e}
1506 end
1507 end
1508
1509 defp normalize_counter(counter) when is_integer(counter), do: counter
1510 defp normalize_counter(_), do: 0
1511
1512 def maybe_update_follow_information(user_data) do
1513 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1514 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1515 {_, true} <-
1516 {:collections_available,
1517 !!(user_data[:following_address] && user_data[:follower_address])},
1518 {:ok, info} <-
1519 fetch_follow_information_for_user(user_data) do
1520 info = Map.merge(user_data[:info] || %{}, info)
1521
1522 user_data
1523 |> Map.put(:info, info)
1524 else
1525 {:user_type_check, false} ->
1526 user_data
1527
1528 {:collections_available, false} ->
1529 user_data
1530
1531 {:enabled, false} ->
1532 user_data
1533
1534 e ->
1535 Logger.error(
1536 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1537 )
1538
1539 user_data
1540 end
1541 end
1542
1543 defp collection_private(%{"first" => %{"type" => type}})
1544 when type in ["CollectionPage", "OrderedCollectionPage"],
1545 do: {:ok, false}
1546
1547 defp collection_private(%{"first" => first}) do
1548 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1549 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1550 {:ok, false}
1551 else
1552 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1553 {:error, _} = e -> e
1554 e -> {:error, e}
1555 end
1556 end
1557
1558 defp collection_private(_data), do: {:ok, true}
1559
1560 def user_data_from_user_object(data) do
1561 with {:ok, data} <- MRF.filter(data) do
1562 {:ok, object_to_user_data(data)}
1563 else
1564 e -> {:error, e}
1565 end
1566 end
1567
1568 def fetch_and_prepare_user_from_ap_id(ap_id) do
1569 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1570 {:ok, data} <- user_data_from_user_object(data) do
1571 {:ok, maybe_update_follow_information(data)}
1572 else
1573 # If this has been deleted, only log a debug and not an error
1574 {:error, "Object has been deleted" = e} ->
1575 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1576 {:error, e}
1577
1578 {:error, {:reject, reason} = e} ->
1579 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1580 {:error, e}
1581
1582 {:error, e} ->
1583 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1584 {:error, e}
1585 end
1586 end
1587
1588 def maybe_handle_clashing_nickname(data) do
1589 with nickname when is_binary(nickname) <- data[:nickname],
1590 %User{} = old_user <- User.get_by_nickname(nickname),
1591 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1592 Logger.info(
1593 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1594 data[:ap_id]
1595 }, renaming."
1596 )
1597
1598 old_user
1599 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1600 |> User.update_and_set_cache()
1601 else
1602 {:ap_id_comparison, true} ->
1603 Logger.info(
1604 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1605 )
1606
1607 _ ->
1608 nil
1609 end
1610 end
1611
1612 def pin_data_from_featured_collection(%{
1613 "type" => type,
1614 "orderedItems" => objects
1615 })
1616 when type in ["OrderedCollection", "Collection"] do
1617 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1618 end
1619
1620 def fetch_and_prepare_featured_from_ap_id(nil) do
1621 {:ok, %{}}
1622 end
1623
1624 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1625 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1626 {:ok, pin_data_from_featured_collection(data)}
1627 else
1628 e ->
1629 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1630 {:ok, %{}}
1631 end
1632 end
1633
1634 def pinned_fetch_task(nil), do: nil
1635
1636 def pinned_fetch_task(%{pinned_objects: pins}) do
1637 if Enum.all?(pins, fn {ap_id, _} ->
1638 Object.get_cached_by_ap_id(ap_id) ||
1639 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1640 end) do
1641 :ok
1642 else
1643 :error
1644 end
1645 end
1646
1647 def make_user_from_ap_id(ap_id) do
1648 user = User.get_cached_by_ap_id(ap_id)
1649
1650 if user && !User.ap_enabled?(user) do
1651 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1652 else
1653 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1654 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1655
1656 if user do
1657 user
1658 |> User.remote_user_changeset(data)
1659 |> User.update_and_set_cache()
1660 else
1661 maybe_handle_clashing_nickname(data)
1662
1663 data
1664 |> User.remote_user_changeset()
1665 |> Repo.insert()
1666 |> User.set_cache()
1667 end
1668 end
1669 end
1670 end
1671
1672 def make_user_from_nickname(nickname) do
1673 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1674 make_user_from_ap_id(ap_id)
1675 else
1676 _e -> {:error, "No AP id in WebFinger"}
1677 end
1678 end
1679
1680 # filter out broken threads
1681 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1682 entire_thread_visible_for_user?(activity, user)
1683 end
1684
1685 # do post-processing on a specific activity
1686 def contain_activity(%Activity{} = activity, %User{} = user) do
1687 contain_broken_threads(activity, user)
1688 end
1689
1690 def fetch_direct_messages_query do
1691 Activity
1692 |> restrict_type(%{type: "Create"})
1693 |> restrict_visibility(%{visibility: "direct"})
1694 |> order_by([activity], asc: activity.id)
1695 end
1696 end