a35ba1fad31d3f4c95bfa4807319412110b478ab
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
53 {recipients, to, cc}
54 end
55
56 defp check_actor_is_active(nil), do: true
57
58 defp check_actor_is_active(actor) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
61 _ -> false
62 end
63 end
64
65 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
66 limit = Config.get([:instance, :remote_limit])
67 String.length(content) <= limit
68 end
69
70 defp check_remote_limit(_), do: true
71
72 def increase_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 end
75
76 def decrease_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 end
79
80 defp increase_replies_count_if_reply(%{
81 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 "type" => "Create"
83 }) do
84 if is_public?(object) do
85 Object.increase_replies_count(reply_ap_id)
86 end
87 end
88
89 defp increase_replies_count_if_reply(_create_data), do: :noop
90
91 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 @impl true
93 def persist(%{"type" => type} = object, meta) when type in @object_types do
94 with {:ok, object} <- Object.create(object) do
95 {:ok, object, meta}
96 end
97 end
98
99 @impl true
100 def persist(object, meta) do
101 with local <- Keyword.fetch!(meta, :local),
102 {recipients, _, _} <- get_recipients(object),
103 {:ok, activity} <-
104 Repo.insert(%Activity{
105 data: object,
106 local: local,
107 recipients: recipients,
108 actor: object["actor"]
109 }),
110 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
111 {:ok, _} <- maybe_create_activity_expiration(activity) do
112 {:ok, activity, meta}
113 end
114 end
115
116 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
117 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
118 with nil <- Activity.normalize(map),
119 map <- lazy_put_activity_defaults(map, fake),
120 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
121 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
122 {:ok, map} <- MRF.filter(map),
123 {recipients, _, _} = get_recipients(map),
124 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
125 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
126 {:ok, map, object} <- insert_full_object(map),
127 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
128 # Splice in the child object if we have one.
129 activity = Maps.put_if_present(activity, :object, object)
130
131 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
132 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
133 end)
134
135 {:ok, activity}
136 else
137 %Activity{} = activity ->
138 {:ok, activity}
139
140 {:actor_check, _} ->
141 {:error, false}
142
143 {:containment, _} = error ->
144 error
145
146 {:error, _} = error ->
147 error
148
149 {:fake, true, map, recipients} ->
150 activity = %Activity{
151 data: map,
152 local: local,
153 actor: map["actor"],
154 recipients: recipients,
155 id: "pleroma:fakeid"
156 }
157
158 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:ok, activity}
160
161 {:remote_limit_pass, _} ->
162 {:error, :remote_limit}
163
164 {:reject, _} = e ->
165 {:error, e}
166 end
167 end
168
169 defp insert_activity_with_expiration(data, local, recipients) do
170 struct = %Activity{
171 data: data,
172 local: local,
173 actor: data["actor"],
174 recipients: recipients
175 }
176
177 with {:ok, activity} <- Repo.insert(struct) do
178 maybe_create_activity_expiration(activity)
179 end
180 end
181
182 def notify_and_stream(activity) do
183 Notification.create_notifications(activity)
184
185 conversation = create_or_bump_conversation(activity, activity.actor)
186 participations = get_participations(conversation)
187 stream_out(activity)
188 stream_out_participations(participations)
189 end
190
191 defp maybe_create_activity_expiration(
192 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 ) do
194 with {:ok, _job} <-
195 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
196 activity_id: activity.id,
197 expires_at: expires_at
198 }) do
199 {:ok, activity}
200 end
201 end
202
203 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204
205 defp create_or_bump_conversation(activity, actor) do
206 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
207 %User{} = user <- User.get_cached_by_ap_id(actor) do
208 Participation.mark_as_read(user, conversation)
209 {:ok, conversation}
210 end
211 end
212
213 defp get_participations({:ok, conversation}) do
214 conversation
215 |> Repo.preload(:participations, force: true)
216 |> Map.get(:participations)
217 end
218
219 defp get_participations(_), do: []
220
221 def stream_out_participations(participations) do
222 participations =
223 participations
224 |> Repo.preload(:user)
225
226 Streamer.stream("participation", participations)
227 end
228
229 @impl true
230 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
231 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
232 conversation = Repo.preload(conversation, :participations)
233
234 last_activity_id =
235 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
236 user: user,
237 blocking_user: user
238 })
239
240 if last_activity_id do
241 stream_out_participations(conversation.participations)
242 end
243 end
244 end
245
246 @impl true
247 def stream_out_participations(_, _), do: :noop
248
249 @impl true
250 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
251 when data_type in ["Create", "Announce", "Delete"] do
252 activity
253 |> Topics.get_activity_topics()
254 |> Streamer.stream(activity)
255 end
256
257 @impl true
258 def stream_out(_activity) do
259 :noop
260 end
261
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
265 result
266 end
267 end
268
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
275
276 create_data =
277 make_create_data(
278 %{to: to, actor: actor, published: published, context: context, object: object},
279 additional
280 )
281
282 with {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 _ <- notify_and_stream(activity),
288 :ok <- maybe_federate(activity) do
289 {:ok, activity}
290 else
291 {:quick_insert, true, activity} ->
292 {:ok, activity}
293
294 {:fake, true, activity} ->
295 {:ok, activity}
296
297 {:error, message} ->
298 Repo.rollback(message)
299 end
300 end
301
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
308
309 listen_data =
310 make_listen_data(
311 %{to: to, actor: actor, published: published, context: context, object: object},
312 additional
313 )
314
315 with {:ok, activity} <- insert(listen_data, local),
316 _ <- notify_and_stream(activity),
317 :ok <- maybe_federate(activity) do
318 {:ok, activity}
319 end
320 end
321
322 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
323 {:ok, Activity.t()} | nil | {:error, any()}
324 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
325 with {:ok, result} <-
326 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
327 result
328 end
329 end
330
331 defp do_unfollow(follower, followed, activity_id, local) do
332 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
333 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
334 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
335 {:ok, activity} <- insert(unfollow_data, local),
336 _ <- notify_and_stream(activity),
337 :ok <- maybe_federate(activity) do
338 {:ok, activity}
339 else
340 nil -> nil
341 {:error, error} -> Repo.rollback(error)
342 end
343 end
344
345 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 def flag(params) do
347 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
348 result
349 end
350 end
351
352 defp do_flag(
353 %{
354 actor: actor,
355 context: _context,
356 account: account,
357 statuses: statuses,
358 content: content
359 } = params
360 ) do
361 # only accept false as false value
362 local = !(params[:local] == false)
363 forward = !(params[:forward] == false)
364
365 additional = params[:additional] || %{}
366
367 additional =
368 if forward do
369 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 else
371 Map.merge(additional, %{"to" => [], "cc" => []})
372 end
373
374 with flag_data <- make_flag_data(params, additional),
375 {:ok, activity} <- insert(flag_data, local),
376 {:ok, stripped_activity} <- strip_report_status_data(activity),
377 _ <- notify_and_stream(activity),
378 :ok <-
379 maybe_federate(stripped_activity) do
380 User.all_superusers()
381 |> Enum.filter(fn user -> user.ap_id != actor end)
382 |> Enum.filter(fn user -> not is_nil(user.email) end)
383 |> Enum.each(fn superuser ->
384 superuser
385 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
386 |> Pleroma.Emails.Mailer.deliver_async()
387 end)
388
389 {:ok, activity}
390 else
391 {:error, error} -> Repo.rollback(error)
392 end
393 end
394
395 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
396 def move(%User{} = origin, %User{} = target, local \\ true) do
397 params = %{
398 "type" => "Move",
399 "actor" => origin.ap_id,
400 "object" => origin.ap_id,
401 "target" => target.ap_id
402 }
403
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
408
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
412 })
413
414 {:ok, activity}
415 else
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
417 err -> err
418 end
419 end
420
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
423
424 recipients =
425 if opts[:user],
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 else: public
428
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_blockers_visibility(opts)
435 |> restrict_recipients(recipients, opts[:user])
436 |> restrict_filtered(opts)
437 |> where(
438 [activity],
439 fragment(
440 "?->>'type' = ? and ?->>'context' = ?",
441 activity.data,
442 "Create",
443 activity.data,
444 ^context
445 )
446 )
447 |> exclude_poll_votes(opts)
448 |> exclude_id(opts)
449 |> order_by([activity], desc: activity.id)
450 end
451
452 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
453 def fetch_activities_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(opts)
456 |> Repo.all()
457 end
458
459 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
460 FlakeId.Ecto.CompatType.t() | nil
461 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
462 context
463 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
464 |> restrict_visibility(%{visibility: "direct"})
465 |> limit(1)
466 |> select([a], a.id)
467 |> Repo.one()
468 end
469
470 defp fetch_paginated_optimized(query, opts, pagination) do
471 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
472 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
473 opts = Map.put(opts, :skip_extra_order, true)
474
475 Pagination.fetch_paginated(query, opts, pagination)
476 end
477
478 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
479 list_memberships = Pleroma.List.memberships(opts[:user])
480
481 fetch_activities_query(recipients ++ list_memberships, opts)
482 |> fetch_paginated_optimized(opts, pagination)
483 |> Enum.reverse()
484 |> maybe_update_cc(list_memberships, opts[:user])
485 end
486
487 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
488 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
489 opts = Map.delete(opts, :user)
490
491 [Constants.as_public()]
492 |> fetch_activities_query(opts)
493 |> restrict_unlisted(opts)
494 |> fetch_paginated_optimized(opts, pagination)
495 end
496
497 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
498 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
499 opts
500 |> Map.put(:restrict_unlisted, true)
501 |> fetch_public_or_unlisted_activities(pagination)
502 end
503
504 @valid_visibilities ~w[direct unlisted public private]
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when is_list(visibility) do
508 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
509 from(
510 a in query,
511 where:
512 fragment(
513 "activity_visibility(?, ?, ?) = ANY (?)",
514 a.actor,
515 a.recipients,
516 a.data,
517 ^visibility
518 )
519 )
520 else
521 Logger.error("Could not restrict visibility to #{visibility}")
522 end
523 end
524
525 defp restrict_visibility(query, %{visibility: visibility})
526 when visibility in @valid_visibilities do
527 from(
528 a in query,
529 where:
530 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
531 )
532 end
533
534 defp restrict_visibility(_query, %{visibility: visibility})
535 when visibility not in @valid_visibilities do
536 Logger.error("Could not restrict visibility to #{visibility}")
537 end
538
539 defp restrict_visibility(query, _visibility), do: query
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when is_list(visibility) do
543 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ANY (?)",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 else
556 Logger.error("Could not exclude visibility to #{visibility}")
557 query
558 end
559 end
560
561 defp exclude_visibility(query, %{exclude_visibilities: visibility})
562 when visibility in @valid_visibilities do
563 from(
564 a in query,
565 where:
566 not fragment(
567 "activity_visibility(?, ?, ?) = ?",
568 a.actor,
569 a.recipients,
570 a.data,
571 ^visibility
572 )
573 )
574 end
575
576 defp exclude_visibility(query, %{exclude_visibilities: visibility})
577 when visibility not in [nil | @valid_visibilities] do
578 Logger.error("Could not exclude visibility to #{visibility}")
579 query
580 end
581
582 defp exclude_visibility(query, _visibility), do: query
583
584 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
585 do: query
586
587 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
588 do: query
589
590 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
591 from(
592 a in query,
593 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
594 )
595 end
596
597 defp restrict_thread_visibility(query, _, _), do: query
598
599 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
600 params =
601 params
602 |> Map.put(:user, reading_user)
603 |> Map.put(:actor_id, user.ap_id)
604
605 %{
606 godmode: params[:godmode],
607 reading_user: reading_user
608 }
609 |> user_activities_recipients()
610 |> fetch_activities(params)
611 |> Enum.reverse()
612 end
613
614 def fetch_user_activities(user, reading_user, params \\ %{})
615
616 def fetch_user_activities(user, reading_user, %{total: true} = params) do
617 result = fetch_activities_for_user(user, reading_user, params)
618
619 Keyword.put(result, :items, Enum.reverse(result[:items]))
620 end
621
622 def fetch_user_activities(user, reading_user, params) do
623 user
624 |> fetch_activities_for_user(reading_user, params)
625 |> Enum.reverse()
626 end
627
628 defp fetch_activities_for_user(user, reading_user, params) do
629 params =
630 params
631 |> Map.put(:type, ["Create", "Announce"])
632 |> Map.put(:user, reading_user)
633 |> Map.put(:actor_id, user.ap_id)
634 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
635
636 params =
637 if User.blocks?(reading_user, user) do
638 params
639 else
640 params
641 |> Map.put(:blocking_user, reading_user)
642 |> Map.put(:muting_user, reading_user)
643 end
644
645 pagination_type = Map.get(params, :pagination_type) || :keyset
646
647 %{
648 godmode: params[:godmode],
649 reading_user: reading_user
650 }
651 |> user_activities_recipients()
652 |> fetch_activities(params, pagination_type)
653 end
654
655 def fetch_statuses(reading_user, %{total: true} = params) do
656 result = fetch_activities_for_reading_user(reading_user, params)
657 Keyword.put(result, :items, Enum.reverse(result[:items]))
658 end
659
660 def fetch_statuses(reading_user, params) do
661 reading_user
662 |> fetch_activities_for_reading_user(params)
663 |> Enum.reverse()
664 end
665
666 defp fetch_activities_for_reading_user(reading_user, params) do
667 params = Map.put(params, :type, ["Create", "Announce"])
668
669 %{
670 godmode: params[:godmode],
671 reading_user: reading_user
672 }
673 |> user_activities_recipients()
674 |> fetch_activities(params, :offset)
675 end
676
677 defp user_activities_recipients(%{godmode: true}), do: []
678
679 defp user_activities_recipients(%{reading_user: reading_user}) do
680 if reading_user do
681 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
682 else
683 [Constants.as_public()]
684 end
685 end
686
687 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
688 raise "Can't use the child object without preloading!"
689 end
690
691 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
692 from(
693 [activity, object] in query,
694 where:
695 fragment(
696 "?->>'type' != ? or ?->>'actor' != ?",
697 activity.data,
698 "Announce",
699 object.data,
700 ^actor
701 )
702 )
703 end
704
705 defp restrict_announce_object_actor(query, _), do: query
706
707 defp restrict_since(query, %{since_id: ""}), do: query
708
709 defp restrict_since(query, %{since_id: since_id}) do
710 from(activity in query, where: activity.id > ^since_id)
711 end
712
713 defp restrict_since(query, _), do: query
714
715 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
716 raise_on_missing_preload()
717 end
718
719 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
720 from(
721 [_activity, object] in query,
722 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
723 )
724 end
725
726 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
727 restrict_embedded_tag_any(query, %{tag: tag})
728 end
729
730 defp restrict_embedded_tag_all(query, _), do: query
731
732 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
733 raise_on_missing_preload()
734 end
735
736 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
737 from(
738 [_activity, object] in query,
739 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
740 )
741 end
742
743 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
744 restrict_embedded_tag_any(query, %{tag: [tag]})
745 end
746
747 defp restrict_embedded_tag_any(query, _), do: query
748
749 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
750 raise_on_missing_preload()
751 end
752
753 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
754 from(
755 [_activity, object] in query,
756 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
757 )
758 end
759
760 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
761 when is_binary(tag_reject) do
762 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
763 end
764
765 defp restrict_embedded_tag_reject_any(query, _), do: query
766
767 defp object_ids_query_for_tags(tags) do
768 from(hto in "hashtags_objects")
769 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
770 |> where([hto, ht], ht.name in ^tags)
771 |> select([hto], hto.object_id)
772 |> distinct([hto], true)
773 end
774
775 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
777 end
778
779 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
780 restrict_hashtag_any(query, %{tag: single_tag})
781 end
782
783 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
784 from(
785 [_activity, object] in query,
786 where:
787 fragment(
788 """
789 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
790 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
791 AND hashtags_objects.object_id = ?) @> ?
792 """,
793 ^tags,
794 object.id,
795 ^tags
796 )
797 )
798 end
799
800 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
801 restrict_hashtag_all(query, %{tag_all: [tag]})
802 end
803
804 defp restrict_hashtag_all(query, _), do: query
805
806 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
807 raise_on_missing_preload()
808 end
809
810 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
811 hashtag_ids =
812 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
813 |> Repo.all()
814
815 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
816 from(
817 [_activity, object] in query,
818 join: hto in "hashtags_objects",
819 on: hto.object_id == object.id,
820 where: hto.hashtag_id in ^hashtag_ids,
821 distinct: [desc: object.id],
822 order_by: [desc: object.id]
823 )
824 end
825
826 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
827 restrict_hashtag_any(query, %{tag: [tag]})
828 end
829
830 defp restrict_hashtag_any(query, _), do: query
831
832 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
833 raise_on_missing_preload()
834 end
835
836 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
837 from(
838 [_activity, object] in query,
839 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
840 )
841 end
842
843 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
844 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
845 end
846
847 defp restrict_hashtag_reject_any(query, _), do: query
848
849 defp raise_on_missing_preload do
850 raise "Can't use the child object without preloading!"
851 end
852
853 defp restrict_recipients(query, [], _user), do: query
854
855 defp restrict_recipients(query, recipients, nil) do
856 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
857 end
858
859 defp restrict_recipients(query, recipients, user) do
860 from(
861 activity in query,
862 where: fragment("? && ?", ^recipients, activity.recipients),
863 or_where: activity.actor == ^user.ap_id
864 )
865 end
866
867 defp restrict_local(query, %{local_only: true}) do
868 from(activity in query, where: activity.local == true)
869 end
870
871 defp restrict_local(query, _), do: query
872
873 defp restrict_remote(query, %{remote: true}) do
874 from(activity in query, where: activity.local == false)
875 end
876
877 defp restrict_remote(query, _), do: query
878
879 defp restrict_actor(query, %{actor_id: actor_id}) do
880 from(activity in query, where: activity.actor == ^actor_id)
881 end
882
883 defp restrict_actor(query, _), do: query
884
885 defp restrict_type(query, %{type: type}) when is_binary(type) do
886 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
887 end
888
889 defp restrict_type(query, %{type: type}) do
890 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
891 end
892
893 defp restrict_type(query, _), do: query
894
895 defp restrict_state(query, %{state: state}) do
896 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
897 end
898
899 defp restrict_state(query, _), do: query
900
901 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
902 from(
903 [_activity, object] in query,
904 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
905 )
906 end
907
908 defp restrict_favorited_by(query, _), do: query
909
910 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
911 raise "Can't use the child object without preloading!"
912 end
913
914 defp restrict_media(query, %{only_media: true}) do
915 from(
916 [activity, object] in query,
917 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
918 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
919 )
920 end
921
922 defp restrict_media(query, _), do: query
923
924 defp restrict_replies(query, %{exclude_replies: true}) do
925 from(
926 [_activity, object] in query,
927 where: fragment("?->>'inReplyTo' is null", object.data)
928 )
929 end
930
931 defp restrict_replies(query, %{
932 reply_filtering_user: %User{} = user,
933 reply_visibility: "self"
934 }) do
935 from(
936 [activity, object] in query,
937 where:
938 fragment(
939 "?->>'inReplyTo' is null OR ? = ANY(?)",
940 object.data,
941 ^user.ap_id,
942 activity.recipients
943 )
944 )
945 end
946
947 defp restrict_replies(query, %{
948 reply_filtering_user: %User{} = user,
949 reply_visibility: "following"
950 }) do
951 from(
952 [activity, object] in query,
953 where:
954 fragment(
955 """
956 ?->>'type' != 'Create' -- This isn't a Create
957 OR ?->>'inReplyTo' is null -- this isn't a reply
958 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
959 -- unless they are the author (because authors
960 -- are also part of the recipients). This leads
961 -- to a bug that self-replies by friends won't
962 -- show up.
963 OR ? = ? -- The actor is us
964 """,
965 activity.data,
966 object.data,
967 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
968 activity.recipients,
969 activity.actor,
970 activity.actor,
971 ^user.ap_id
972 )
973 )
974 end
975
976 defp restrict_replies(query, _), do: query
977
978 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
979 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
980 end
981
982 defp restrict_reblogs(query, _), do: query
983
984 defp restrict_muted(query, %{with_muted: true}), do: query
985
986 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
987 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
988
989 query =
990 from([activity] in query,
991 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
992 where:
993 fragment(
994 "not (?->'to' \\?| ?) or ? = ?",
995 activity.data,
996 ^mutes,
997 activity.actor,
998 ^user.ap_id
999 )
1000 )
1001
1002 unless opts[:skip_preload] do
1003 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1004 else
1005 query
1006 end
1007 end
1008
1009 defp restrict_muted(query, _), do: query
1010
1011 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1012 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1013 domain_blocks = user.domain_blocks || []
1014
1015 following_ap_ids = User.get_friends_ap_ids(user)
1016
1017 query =
1018 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1019
1020 from(
1021 [activity, object: o] in query,
1022 # You don't block the author
1023 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1024
1025 # You don't block any recipients, and didn't author the post
1026 where:
1027 fragment(
1028 "((not (? && ?)) or ? = ?)",
1029 activity.recipients,
1030 ^blocked_ap_ids,
1031 activity.actor,
1032 ^user.ap_id
1033 ),
1034
1035 # You don't block the domain of any recipients, and didn't author the post
1036 where:
1037 fragment(
1038 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1039 activity.recipients,
1040 ^domain_blocks,
1041 activity.actor,
1042 ^user.ap_id
1043 ),
1044
1045 # It's not a boost of a user you block
1046 where:
1047 fragment(
1048 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1049 activity.data,
1050 activity.data,
1051 ^blocked_ap_ids
1052 ),
1053
1054 # You don't block the author's domain, and also don't follow the author
1055 where:
1056 fragment(
1057 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1058 activity.actor,
1059 ^domain_blocks,
1060 activity.actor,
1061 ^following_ap_ids
1062 ),
1063
1064 # Same as above, but checks the Object
1065 where:
1066 fragment(
1067 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1068 o.data,
1069 ^domain_blocks,
1070 o.data,
1071 ^following_ap_ids
1072 )
1073 )
1074 end
1075
1076 defp restrict_blocked(query, _), do: query
1077
1078 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1079 if Config.get([:activitypub, :blockers_visible]) == true do
1080 query
1081 else
1082 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1083
1084 from(
1085 activity in query,
1086 # The author doesn't block you
1087 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1088
1089 # It's not a boost of a user that blocks you
1090 where:
1091 fragment(
1092 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1093 activity.data,
1094 activity.data,
1095 ^blocker_ap_ids
1096 )
1097 )
1098 end
1099 end
1100
1101 defp restrict_blockers_visibility(query, _), do: query
1102
1103 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1104 from(
1105 activity in query,
1106 where:
1107 fragment(
1108 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1109 activity.data,
1110 ^[Constants.as_public()]
1111 )
1112 )
1113 end
1114
1115 defp restrict_unlisted(query, _), do: query
1116
1117 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1118 from(
1119 [activity, object: o] in query,
1120 where:
1121 fragment(
1122 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1123 activity.data,
1124 activity.data,
1125 activity.data,
1126 ^ids
1127 )
1128 )
1129 end
1130
1131 defp restrict_pinned(query, _), do: query
1132
1133 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1134 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1135
1136 from(
1137 activity in query,
1138 where:
1139 fragment(
1140 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1141 activity.data,
1142 activity.actor,
1143 ^muted_reblogs
1144 )
1145 )
1146 end
1147
1148 defp restrict_muted_reblogs(query, _), do: query
1149
1150 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1151 from(
1152 activity in query,
1153 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1154 )
1155 end
1156
1157 defp restrict_instance(query, _), do: query
1158
1159 defp restrict_filtered(query, %{user: %User{} = user}) do
1160 case Filter.compose_regex(user) do
1161 nil ->
1162 query
1163
1164 regex ->
1165 from([activity, object] in query,
1166 where:
1167 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1168 activity.actor == ^user.ap_id
1169 )
1170 end
1171 end
1172
1173 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1174 restrict_filtered(query, %{user: user})
1175 end
1176
1177 defp restrict_filtered(query, _), do: query
1178
1179 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1180
1181 defp exclude_poll_votes(query, _) do
1182 if has_named_binding?(query, :object) do
1183 from([activity, object: o] in query,
1184 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1185 )
1186 else
1187 query
1188 end
1189 end
1190
1191 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1192
1193 defp exclude_chat_messages(query, _) do
1194 if has_named_binding?(query, :object) do
1195 from([activity, object: o] in query,
1196 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1197 )
1198 else
1199 query
1200 end
1201 end
1202
1203 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1204
1205 defp exclude_invisible_actors(query, _opts) do
1206 invisible_ap_ids =
1207 User.Query.build(%{invisible: true, select: [:ap_id]})
1208 |> Repo.all()
1209 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1210
1211 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1212 end
1213
1214 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1215 from(activity in query, where: activity.id != ^id)
1216 end
1217
1218 defp exclude_id(query, _), do: query
1219
1220 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1221
1222 defp maybe_preload_objects(query, _) do
1223 query
1224 |> Activity.with_preloaded_object()
1225 end
1226
1227 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1228
1229 defp maybe_preload_bookmarks(query, opts) do
1230 query
1231 |> Activity.with_preloaded_bookmark(opts[:user])
1232 end
1233
1234 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1235 query
1236 |> Activity.with_preloaded_report_notes()
1237 end
1238
1239 defp maybe_preload_report_notes(query, _), do: query
1240
1241 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1242
1243 defp maybe_set_thread_muted_field(query, opts) do
1244 query
1245 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1246 end
1247
1248 defp maybe_order(query, %{order: :desc}) do
1249 query
1250 |> order_by(desc: :id)
1251 end
1252
1253 defp maybe_order(query, %{order: :asc}) do
1254 query
1255 |> order_by(asc: :id)
1256 end
1257
1258 defp maybe_order(query, _), do: query
1259
1260 defp normalize_fetch_activities_query_opts(opts) do
1261 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1262 case opts[key] do
1263 value when is_bitstring(value) ->
1264 Map.put(opts, key, Hashtag.normalize_name(value))
1265
1266 value when is_list(value) ->
1267 normalized_value =
1268 value
1269 |> Enum.map(&Hashtag.normalize_name/1)
1270 |> Enum.uniq()
1271
1272 Map.put(opts, key, normalized_value)
1273
1274 _ ->
1275 opts
1276 end
1277 end)
1278 end
1279
1280 defp fetch_activities_query_ap_ids_ops(opts) do
1281 source_user = opts[:muting_user]
1282 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1283
1284 ap_id_relationships =
1285 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1286 [:block | ap_id_relationships]
1287 else
1288 ap_id_relationships
1289 end
1290
1291 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1292
1293 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1294 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1295
1296 restrict_muted_reblogs_opts =
1297 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1298
1299 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1300 end
1301
1302 def fetch_activities_query(recipients, opts \\ %{}) do
1303 opts = normalize_fetch_activities_query_opts(opts)
1304
1305 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1306 fetch_activities_query_ap_ids_ops(opts)
1307
1308 config = %{
1309 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1310 }
1311
1312 query =
1313 Activity
1314 |> maybe_preload_objects(opts)
1315 |> maybe_preload_bookmarks(opts)
1316 |> maybe_preload_report_notes(opts)
1317 |> maybe_set_thread_muted_field(opts)
1318 |> maybe_order(opts)
1319 |> restrict_recipients(recipients, opts[:user])
1320 |> restrict_replies(opts)
1321 |> restrict_since(opts)
1322 |> restrict_local(opts)
1323 |> restrict_remote(opts)
1324 |> restrict_actor(opts)
1325 |> restrict_type(opts)
1326 |> restrict_state(opts)
1327 |> restrict_favorited_by(opts)
1328 |> restrict_blocked(restrict_blocked_opts)
1329 |> restrict_blockers_visibility(opts)
1330 |> restrict_muted(restrict_muted_opts)
1331 |> restrict_filtered(opts)
1332 |> restrict_media(opts)
1333 |> restrict_visibility(opts)
1334 |> restrict_thread_visibility(opts, config)
1335 |> restrict_reblogs(opts)
1336 |> restrict_pinned(opts)
1337 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1338 |> restrict_instance(opts)
1339 |> restrict_announce_object_actor(opts)
1340 |> restrict_filtered(opts)
1341 |> Activity.restrict_deactivated_users()
1342 |> exclude_poll_votes(opts)
1343 |> exclude_chat_messages(opts)
1344 |> exclude_invisible_actors(opts)
1345 |> exclude_visibility(opts)
1346
1347 if Config.feature_enabled?(:improved_hashtag_timeline) do
1348 query
1349 |> restrict_hashtag_any(opts)
1350 |> restrict_hashtag_all(opts)
1351 |> restrict_hashtag_reject_any(opts)
1352 else
1353 query
1354 |> restrict_embedded_tag_any(opts)
1355 |> restrict_embedded_tag_all(opts)
1356 |> restrict_embedded_tag_reject_any(opts)
1357 end
1358 end
1359
1360 @doc """
1361 Fetch favorites activities of user with order by sort adds to favorites
1362 """
1363 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1364 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1365 user.ap_id
1366 |> Activity.Queries.by_actor()
1367 |> Activity.Queries.by_type("Like")
1368 |> Activity.with_joined_object()
1369 |> Object.with_joined_activity()
1370 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1371 |> order_by([like, _, _], desc_nulls_last: like.id)
1372 |> Pagination.fetch_paginated(
1373 Map.merge(params, %{skip_order: true}),
1374 pagination
1375 )
1376 end
1377
1378 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1379 Enum.map(activities, fn
1380 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1381 if Enum.any?(bcc, &(&1 in list_memberships)) do
1382 update_in(activity.data["cc"], &[user_ap_id | &1])
1383 else
1384 activity
1385 end
1386
1387 activity ->
1388 activity
1389 end)
1390 end
1391
1392 defp maybe_update_cc(activities, _, _), do: activities
1393
1394 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1395 from(activity in query,
1396 where:
1397 fragment("? && ?", activity.recipients, ^recipients) or
1398 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1399 ^Constants.as_public() in activity.recipients)
1400 )
1401 end
1402
1403 def fetch_activities_bounded(
1404 recipients,
1405 recipients_with_public,
1406 opts \\ %{},
1407 pagination \\ :keyset
1408 ) do
1409 fetch_activities_query([], opts)
1410 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1411 |> Pagination.fetch_paginated(opts, pagination)
1412 |> Enum.reverse()
1413 end
1414
1415 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1416 def upload(file, opts \\ []) do
1417 with {:ok, data} <- Upload.store(file, opts) do
1418 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1419
1420 Repo.insert(%Object{data: obj_data})
1421 end
1422 end
1423
1424 @spec get_actor_url(any()) :: binary() | nil
1425 defp get_actor_url(url) when is_binary(url), do: url
1426 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1427
1428 defp get_actor_url(url) when is_list(url) do
1429 url
1430 |> List.first()
1431 |> get_actor_url()
1432 end
1433
1434 defp get_actor_url(_url), do: nil
1435
1436 defp normalize_image(%{"url" => url}) do
1437 %{
1438 "type" => "Image",
1439 "url" => [%{"href" => url}]
1440 }
1441 end
1442
1443 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1444 defp normalize_image(_), do: nil
1445
1446 defp object_to_user_data(data) do
1447 fields =
1448 data
1449 |> Map.get("attachment", [])
1450 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1451 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1452
1453 emojis =
1454 data
1455 |> Map.get("tag", [])
1456 |> Enum.filter(fn
1457 %{"type" => "Emoji"} -> true
1458 _ -> false
1459 end)
1460 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1461 {String.trim(name, ":"), url}
1462 end)
1463
1464 is_locked = data["manuallyApprovesFollowers"] || false
1465 capabilities = data["capabilities"] || %{}
1466 accepts_chat_messages = capabilities["acceptsChatMessages"]
1467 data = Transmogrifier.maybe_fix_user_object(data)
1468 is_discoverable = data["discoverable"] || false
1469 invisible = data["invisible"] || false
1470 actor_type = data["type"] || "Person"
1471
1472 featured_address = data["featured"]
1473 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1474
1475 public_key =
1476 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1477 data["publicKey"]["publicKeyPem"]
1478 else
1479 nil
1480 end
1481
1482 shared_inbox =
1483 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1484 data["endpoints"]["sharedInbox"]
1485 else
1486 nil
1487 end
1488
1489 user_data = %{
1490 ap_id: data["id"],
1491 uri: get_actor_url(data["url"]),
1492 ap_enabled: true,
1493 banner: normalize_image(data["image"]),
1494 fields: fields,
1495 emoji: emojis,
1496 is_locked: is_locked,
1497 is_discoverable: is_discoverable,
1498 invisible: invisible,
1499 avatar: normalize_image(data["icon"]),
1500 name: data["name"],
1501 follower_address: data["followers"],
1502 following_address: data["following"],
1503 featured_address: featured_address,
1504 bio: data["summary"] || "",
1505 actor_type: actor_type,
1506 also_known_as: Map.get(data, "alsoKnownAs", []),
1507 public_key: public_key,
1508 inbox: data["inbox"],
1509 shared_inbox: shared_inbox,
1510 accepts_chat_messages: accepts_chat_messages,
1511 pinned_objects: pinned_objects
1512 }
1513
1514 # nickname can be nil because of virtual actors
1515 if data["preferredUsername"] do
1516 Map.put(
1517 user_data,
1518 :nickname,
1519 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1520 )
1521 else
1522 Map.put(user_data, :nickname, nil)
1523 end
1524 end
1525
1526 def fetch_follow_information_for_user(user) do
1527 with {:ok, following_data} <-
1528 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1529 {:ok, hide_follows} <- collection_private(following_data),
1530 {:ok, followers_data} <-
1531 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1532 {:ok, hide_followers} <- collection_private(followers_data) do
1533 {:ok,
1534 %{
1535 hide_follows: hide_follows,
1536 follower_count: normalize_counter(followers_data["totalItems"]),
1537 following_count: normalize_counter(following_data["totalItems"]),
1538 hide_followers: hide_followers
1539 }}
1540 else
1541 {:error, _} = e -> e
1542 e -> {:error, e}
1543 end
1544 end
1545
1546 defp normalize_counter(counter) when is_integer(counter), do: counter
1547 defp normalize_counter(_), do: 0
1548
1549 def maybe_update_follow_information(user_data) do
1550 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1551 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1552 {_, true} <-
1553 {:collections_available,
1554 !!(user_data[:following_address] && user_data[:follower_address])},
1555 {:ok, info} <-
1556 fetch_follow_information_for_user(user_data) do
1557 info = Map.merge(user_data[:info] || %{}, info)
1558
1559 user_data
1560 |> Map.put(:info, info)
1561 else
1562 {:user_type_check, false} ->
1563 user_data
1564
1565 {:collections_available, false} ->
1566 user_data
1567
1568 {:enabled, false} ->
1569 user_data
1570
1571 e ->
1572 Logger.error(
1573 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1574 )
1575
1576 user_data
1577 end
1578 end
1579
1580 defp collection_private(%{"first" => %{"type" => type}})
1581 when type in ["CollectionPage", "OrderedCollectionPage"],
1582 do: {:ok, false}
1583
1584 defp collection_private(%{"first" => first}) do
1585 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1586 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1587 {:ok, false}
1588 else
1589 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1590 {:error, _} = e -> e
1591 e -> {:error, e}
1592 end
1593 end
1594
1595 defp collection_private(_data), do: {:ok, true}
1596
1597 def user_data_from_user_object(data) do
1598 with {:ok, data} <- MRF.filter(data) do
1599 {:ok, object_to_user_data(data)}
1600 else
1601 e -> {:error, e}
1602 end
1603 end
1604
1605 def fetch_and_prepare_user_from_ap_id(ap_id) do
1606 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1607 {:ok, data} <- user_data_from_user_object(data) do
1608 {:ok, maybe_update_follow_information(data)}
1609 else
1610 # If this has been deleted, only log a debug and not an error
1611 {:error, "Object has been deleted" = e} ->
1612 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1613 {:error, e}
1614
1615 {:error, {:reject, reason} = e} ->
1616 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1617 {:error, e}
1618
1619 {:error, e} ->
1620 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1621 {:error, e}
1622 end
1623 end
1624
1625 def maybe_handle_clashing_nickname(data) do
1626 with nickname when is_binary(nickname) <- data[:nickname],
1627 %User{} = old_user <- User.get_by_nickname(nickname),
1628 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1629 Logger.info(
1630 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1631 data[:ap_id]
1632 }, renaming."
1633 )
1634
1635 old_user
1636 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1637 |> User.update_and_set_cache()
1638 else
1639 {:ap_id_comparison, true} ->
1640 Logger.info(
1641 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1642 )
1643
1644 _ ->
1645 nil
1646 end
1647 end
1648
1649 def pin_data_from_featured_collection(%{
1650 "type" => type,
1651 "orderedItems" => objects
1652 })
1653 when type in ["OrderedCollection", "Collection"] do
1654 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1655 end
1656
1657 def fetch_and_prepare_featured_from_ap_id(nil) do
1658 {:ok, %{}}
1659 end
1660
1661 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1662 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1663 {:ok, pin_data_from_featured_collection(data)}
1664 else
1665 e ->
1666 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1667 {:ok, %{}}
1668 end
1669 end
1670
1671 def pinned_fetch_task(nil), do: nil
1672
1673 def pinned_fetch_task(%{pinned_objects: pins}) do
1674 if Enum.all?(pins, fn {ap_id, _} ->
1675 Object.get_cached_by_ap_id(ap_id) ||
1676 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1677 end) do
1678 :ok
1679 else
1680 :error
1681 end
1682 end
1683
1684 def make_user_from_ap_id(ap_id) do
1685 user = User.get_cached_by_ap_id(ap_id)
1686
1687 if user && !User.ap_enabled?(user) do
1688 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1689 else
1690 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1691 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1692
1693 if user do
1694 user
1695 |> User.remote_user_changeset(data)
1696 |> User.update_and_set_cache()
1697 else
1698 maybe_handle_clashing_nickname(data)
1699
1700 data
1701 |> User.remote_user_changeset()
1702 |> Repo.insert()
1703 |> User.set_cache()
1704 end
1705 end
1706 end
1707 end
1708
1709 def make_user_from_nickname(nickname) do
1710 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1711 make_user_from_ap_id(ap_id)
1712 else
1713 _e -> {:error, "No AP id in WebFinger"}
1714 end
1715 end
1716
1717 # filter out broken threads
1718 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1719 entire_thread_visible_for_user?(activity, user)
1720 end
1721
1722 # do post-processing on a specific activity
1723 def contain_activity(%Activity{} = activity, %User{} = user) do
1724 contain_broken_threads(activity, user)
1725 end
1726
1727 def fetch_direct_messages_query do
1728 Activity
1729 |> restrict_type(%{type: "Create"})
1730 |> restrict_visibility(%{visibility: "direct"})
1731 |> order_by([activity], asc: activity.id)
1732 end
1733 end