implement Move activities (#45)
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
199 stream_out(activity)
200 stream_out_participations(participations)
201 end
202
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
205 ) do
206 with {:ok, _job} <-
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
210 }) do
211 {:ok, activity}
212 end
213 end
214
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
216
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
221 {:ok, conversation}
222 end
223 end
224
225 defp get_participations({:ok, conversation}) do
226 conversation
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
229 end
230
231 defp get_participations(_), do: []
232
233 def stream_out_participations(participations) do
234 participations =
235 participations
236 |> Repo.preload(:user)
237
238 Streamer.stream("participation", participations)
239 end
240
241 @impl true
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
245
246 last_activity_id =
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
248 user: user,
249 blocking_user: user
250 })
251
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
254 end
255 end
256 end
257
258 @impl true
259 def stream_out_participations(_, _), do: :noop
260
261 @impl true
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
264 activity
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
267 end
268
269 @impl true
270 def stream_out(_activity) do
271 :noop
272 end
273
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
277 result
278 end
279 end
280
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
287
288 create_data =
289 make_create_data(
290 %{to: to, actor: actor, published: published, context: context, object: object},
291 additional
292 )
293
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 else
305 {:quick_insert, true, activity} ->
306 {:ok, activity}
307
308 {:fake, true, activity} ->
309 {:ok, activity}
310
311 {:error, message} ->
312 Repo.rollback(message)
313 end
314 end
315
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
318 :ok
319 end
320
321 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
322 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
323 additional = params[:additional] || %{}
324 # only accept false as false value
325 local = !(params[:local] == false)
326 published = params[:published]
327
328 listen_data =
329 make_listen_data(
330 %{to: to, actor: actor, published: published, context: context, object: object},
331 additional
332 )
333
334 with {:ok, activity} <- insert(listen_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 end
339 end
340
341 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
342 {:ok, Activity.t()} | nil | {:error, any()}
343 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
344 with {:ok, result} <-
345 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
346 result
347 end
348 end
349
350 defp do_unfollow(follower, followed, activity_id, local) do
351 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
352 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
353 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
354 {:ok, activity} <- insert(unfollow_data, local),
355 _ <- notify_and_stream(activity),
356 :ok <- maybe_federate(activity) do
357 {:ok, activity}
358 else
359 nil -> nil
360 {:error, error} -> Repo.rollback(error)
361 end
362 end
363
364 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
365 def flag(params) do
366 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
367 result
368 end
369 end
370
371 defp do_flag(
372 %{
373 actor: actor,
374 context: _context,
375 account: account,
376 statuses: statuses,
377 content: content
378 } = params
379 ) do
380 # only accept false as false value
381 local = !(params[:local] == false)
382 forward = !(params[:forward] == false)
383
384 additional = params[:additional] || %{}
385
386 additional =
387 if forward do
388 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
389 else
390 Map.merge(additional, %{"to" => [], "cc" => []})
391 end
392
393 with flag_data <- make_flag_data(params, additional),
394 {:ok, activity} <- insert(flag_data, local),
395 {:ok, stripped_activity} <- strip_report_status_data(activity),
396 _ <- notify_and_stream(activity),
397 :ok <-
398 maybe_federate(stripped_activity) do
399 User.all_superusers()
400 |> Enum.filter(fn user -> user.ap_id != actor end)
401 |> Enum.filter(fn user -> not is_nil(user.email) end)
402 |> Enum.each(fn superuser ->
403 superuser
404 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
405 |> Pleroma.Emails.Mailer.deliver_async()
406 end)
407
408 {:ok, activity}
409 else
410 {:error, error} -> Repo.rollback(error)
411 end
412 end
413
414 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
415 def move(%User{} = origin, %User{} = target, local \\ true) do
416 params = %{
417 "type" => "Move",
418 "actor" => origin.ap_id,
419 "object" => origin.ap_id,
420 "target" => target.ap_id,
421 "to" => [origin.follower_address]
422 }
423
424 with true <- origin.ap_id in target.also_known_as,
425 {:ok, activity} <- insert(params, local),
426 _ <- notify_and_stream(activity) do
427 maybe_federate(activity)
428
429 BackgroundWorker.enqueue("move_following", %{
430 "origin_id" => origin.id,
431 "target_id" => target.id
432 })
433
434 {:ok, activity}
435 else
436 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
437 err -> err
438 end
439 end
440
441 def fetch_activities_for_context_query(context, opts) do
442 public = [Constants.as_public()]
443
444 recipients =
445 if opts[:user],
446 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
447 else: public
448
449 from(activity in Activity)
450 |> maybe_preload_objects(opts)
451 |> maybe_preload_bookmarks(opts)
452 |> maybe_set_thread_muted_field(opts)
453 |> restrict_blocked(opts)
454 |> restrict_blockers_visibility(opts)
455 |> restrict_recipients(recipients, opts[:user])
456 |> restrict_filtered(opts)
457 |> where(
458 [activity],
459 fragment(
460 "?->>'type' = ? and ?->>'context' = ?",
461 activity.data,
462 "Create",
463 activity.data,
464 ^context
465 )
466 )
467 |> exclude_poll_votes(opts)
468 |> exclude_id(opts)
469 |> order_by([activity], desc: activity.id)
470 end
471
472 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
473 def fetch_activities_for_context(context, opts \\ %{}) do
474 context
475 |> fetch_activities_for_context_query(opts)
476 |> Repo.all()
477 end
478
479 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
480 FlakeId.Ecto.CompatType.t() | nil
481 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
482 context
483 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
484 |> restrict_visibility(%{visibility: "direct"})
485 |> limit(1)
486 |> select([a], a.id)
487 |> Repo.one()
488 end
489
490 defp fetch_paginated_optimized(query, opts, pagination) do
491 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
492 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
493 opts = Map.put(opts, :skip_extra_order, true)
494
495 Pagination.fetch_paginated(query, opts, pagination)
496 end
497
498 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
499 list_memberships = Pleroma.List.memberships(opts[:user])
500
501 fetch_activities_query(recipients ++ list_memberships, opts)
502 |> fetch_paginated_optimized(opts, pagination)
503 |> Enum.reverse()
504 |> maybe_update_cc(list_memberships, opts[:user])
505 end
506
507 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
508 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
509 opts = Map.delete(opts, :user)
510
511 [Constants.as_public()]
512 |> fetch_activities_query(opts)
513 |> restrict_unlisted(opts)
514 |> fetch_paginated_optimized(opts, pagination)
515 end
516
517 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
518 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
519 opts
520 |> Map.put(:restrict_unlisted, true)
521 |> fetch_public_or_unlisted_activities(pagination)
522 end
523
524 @valid_visibilities ~w[direct unlisted public private]
525
526 defp restrict_visibility(query, %{visibility: visibility})
527 when is_list(visibility) do
528 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
529 from(
530 a in query,
531 where:
532 fragment(
533 "activity_visibility(?, ?, ?) = ANY (?)",
534 a.actor,
535 a.recipients,
536 a.data,
537 ^visibility
538 )
539 )
540 else
541 Logger.error("Could not restrict visibility to #{visibility}")
542 end
543 end
544
545 defp restrict_visibility(query, %{visibility: visibility})
546 when visibility in @valid_visibilities do
547 from(
548 a in query,
549 where:
550 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
551 )
552 end
553
554 defp restrict_visibility(_query, %{visibility: visibility})
555 when visibility not in @valid_visibilities do
556 Logger.error("Could not restrict visibility to #{visibility}")
557 end
558
559 defp restrict_visibility(query, _visibility), do: query
560
561 defp exclude_visibility(query, %{exclude_visibilities: visibility})
562 when is_list(visibility) do
563 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
564 from(
565 a in query,
566 where:
567 not fragment(
568 "activity_visibility(?, ?, ?) = ANY (?)",
569 a.actor,
570 a.recipients,
571 a.data,
572 ^visibility
573 )
574 )
575 else
576 Logger.error("Could not exclude visibility to #{visibility}")
577 query
578 end
579 end
580
581 defp exclude_visibility(query, %{exclude_visibilities: visibility})
582 when visibility in @valid_visibilities do
583 from(
584 a in query,
585 where:
586 not fragment(
587 "activity_visibility(?, ?, ?) = ?",
588 a.actor,
589 a.recipients,
590 a.data,
591 ^visibility
592 )
593 )
594 end
595
596 defp exclude_visibility(query, %{exclude_visibilities: visibility})
597 when visibility not in [nil | @valid_visibilities] do
598 Logger.error("Could not exclude visibility to #{visibility}")
599 query
600 end
601
602 defp exclude_visibility(query, _visibility), do: query
603
604 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
605 do: query
606
607 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
608 do: query
609
610 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
611 from(
612 a in query,
613 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
614 )
615 end
616
617 defp restrict_thread_visibility(query, _, _), do: query
618
619 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
620 params =
621 params
622 |> Map.put(:user, reading_user)
623 |> Map.put(:actor_id, user.ap_id)
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params)
631 |> Enum.reverse()
632 end
633
634 def fetch_user_activities(user, reading_user, params \\ %{})
635
636 def fetch_user_activities(user, reading_user, %{total: true} = params) do
637 result = fetch_activities_for_user(user, reading_user, params)
638
639 Keyword.put(result, :items, Enum.reverse(result[:items]))
640 end
641
642 def fetch_user_activities(user, reading_user, params) do
643 user
644 |> fetch_activities_for_user(reading_user, params)
645 |> Enum.reverse()
646 end
647
648 defp fetch_activities_for_user(user, reading_user, params) do
649 params =
650 params
651 |> Map.put(:type, ["Create", "Announce"])
652 |> Map.put(:user, reading_user)
653 |> Map.put(:actor_id, user.ap_id)
654 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
655
656 params =
657 if User.blocks?(reading_user, user) do
658 params
659 else
660 params
661 |> Map.put(:blocking_user, reading_user)
662 |> Map.put(:muting_user, reading_user)
663 end
664
665 pagination_type = Map.get(params, :pagination_type) || :keyset
666
667 %{
668 godmode: params[:godmode],
669 reading_user: reading_user
670 }
671 |> user_activities_recipients()
672 |> fetch_activities(params, pagination_type)
673 end
674
675 def fetch_statuses(reading_user, %{total: true} = params) do
676 result = fetch_activities_for_reading_user(reading_user, params)
677 Keyword.put(result, :items, Enum.reverse(result[:items]))
678 end
679
680 def fetch_statuses(reading_user, params) do
681 reading_user
682 |> fetch_activities_for_reading_user(params)
683 |> Enum.reverse()
684 end
685
686 defp fetch_activities_for_reading_user(reading_user, params) do
687 params = Map.put(params, :type, ["Create", "Announce"])
688
689 %{
690 godmode: params[:godmode],
691 reading_user: reading_user
692 }
693 |> user_activities_recipients()
694 |> fetch_activities(params, :offset)
695 end
696
697 defp user_activities_recipients(%{godmode: true}), do: []
698
699 defp user_activities_recipients(%{reading_user: reading_user}) do
700 if reading_user do
701 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
702 else
703 [Constants.as_public()]
704 end
705 end
706
707 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
708 raise "Can't use the child object without preloading!"
709 end
710
711 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
712 from(
713 [activity, object] in query,
714 where:
715 fragment(
716 "?->>'type' != ? or ?->>'actor' != ?",
717 activity.data,
718 "Announce",
719 object.data,
720 ^actor
721 )
722 )
723 end
724
725 defp restrict_announce_object_actor(query, _), do: query
726
727 defp restrict_since(query, %{since_id: ""}), do: query
728
729 defp restrict_since(query, %{since_id: since_id}) do
730 from(activity in query, where: activity.id > ^since_id)
731 end
732
733 defp restrict_since(query, _), do: query
734
735 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
736 raise_on_missing_preload()
737 end
738
739 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
740 from(
741 [_activity, object] in query,
742 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
743 )
744 end
745
746 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
747 restrict_embedded_tag_any(query, %{tag: tag})
748 end
749
750 defp restrict_embedded_tag_all(query, _), do: query
751
752 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
753 raise_on_missing_preload()
754 end
755
756 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
757 from(
758 [_activity, object] in query,
759 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
760 )
761 end
762
763 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
764 restrict_embedded_tag_any(query, %{tag: [tag]})
765 end
766
767 defp restrict_embedded_tag_any(query, _), do: query
768
769 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
770 raise_on_missing_preload()
771 end
772
773 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
774 from(
775 [_activity, object] in query,
776 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
777 )
778 end
779
780 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
781 when is_binary(tag_reject) do
782 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
783 end
784
785 defp restrict_embedded_tag_reject_any(query, _), do: query
786
787 defp object_ids_query_for_tags(tags) do
788 from(hto in "hashtags_objects")
789 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
790 |> where([hto, ht], ht.name in ^tags)
791 |> select([hto], hto.object_id)
792 |> distinct([hto], true)
793 end
794
795 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
796 raise_on_missing_preload()
797 end
798
799 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
800 restrict_hashtag_any(query, %{tag: single_tag})
801 end
802
803 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
804 from(
805 [_activity, object] in query,
806 where:
807 fragment(
808 """
809 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
810 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
811 AND hashtags_objects.object_id = ?) @> ?
812 """,
813 ^tags,
814 object.id,
815 ^tags
816 )
817 )
818 end
819
820 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
821 restrict_hashtag_all(query, %{tag_all: [tag]})
822 end
823
824 defp restrict_hashtag_all(query, _), do: query
825
826 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
827 raise_on_missing_preload()
828 end
829
830 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
831 hashtag_ids =
832 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
833 |> Repo.all()
834
835 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
836 from(
837 [_activity, object] in query,
838 join: hto in "hashtags_objects",
839 on: hto.object_id == object.id,
840 where: hto.hashtag_id in ^hashtag_ids,
841 distinct: [desc: object.id],
842 order_by: [desc: object.id]
843 )
844 end
845
846 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
847 restrict_hashtag_any(query, %{tag: [tag]})
848 end
849
850 defp restrict_hashtag_any(query, _), do: query
851
852 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
853 raise_on_missing_preload()
854 end
855
856 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
857 from(
858 [_activity, object] in query,
859 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
860 )
861 end
862
863 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
864 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
865 end
866
867 defp restrict_hashtag_reject_any(query, _), do: query
868
869 defp raise_on_missing_preload do
870 raise "Can't use the child object without preloading!"
871 end
872
873 defp restrict_recipients(query, [], _user), do: query
874
875 defp restrict_recipients(query, recipients, nil) do
876 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
877 end
878
879 defp restrict_recipients(query, recipients, user) do
880 from(
881 activity in query,
882 where: fragment("? && ?", ^recipients, activity.recipients),
883 or_where: activity.actor == ^user.ap_id
884 )
885 end
886
887 defp restrict_local(query, %{local_only: true}) do
888 from(activity in query, where: activity.local == true)
889 end
890
891 defp restrict_local(query, _), do: query
892
893 defp restrict_remote(query, %{remote: true}) do
894 from(activity in query, where: activity.local == false)
895 end
896
897 defp restrict_remote(query, _), do: query
898
899 defp restrict_actor(query, %{actor_id: actor_id}) do
900 from(activity in query, where: activity.actor == ^actor_id)
901 end
902
903 defp restrict_actor(query, _), do: query
904
905 defp restrict_type(query, %{type: type}) when is_binary(type) do
906 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
907 end
908
909 defp restrict_type(query, %{type: type}) do
910 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
911 end
912
913 defp restrict_type(query, _), do: query
914
915 defp restrict_state(query, %{state: state}) do
916 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
917 end
918
919 defp restrict_state(query, _), do: query
920
921 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
922 from(
923 [_activity, object] in query,
924 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
925 )
926 end
927
928 defp restrict_favorited_by(query, _), do: query
929
930 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
931 raise "Can't use the child object without preloading!"
932 end
933
934 defp restrict_media(query, %{only_media: true}) do
935 from(
936 [activity, object] in query,
937 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
938 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
939 )
940 end
941
942 defp restrict_media(query, _), do: query
943
944 defp restrict_replies(query, %{exclude_replies: true}) do
945 from(
946 [_activity, object] in query,
947 where: fragment("?->>'inReplyTo' is null", object.data)
948 )
949 end
950
951 defp restrict_replies(query, %{
952 reply_filtering_user: %User{} = user,
953 reply_visibility: "self"
954 }) do
955 from(
956 [activity, object] in query,
957 where:
958 fragment(
959 "?->>'inReplyTo' is null OR ? = ANY(?)",
960 object.data,
961 ^user.ap_id,
962 activity.recipients
963 )
964 )
965 end
966
967 defp restrict_replies(query, %{
968 reply_filtering_user: %User{} = user,
969 reply_visibility: "following"
970 }) do
971 from(
972 [activity, object] in query,
973 where:
974 fragment(
975 """
976 ?->>'type' != 'Create' -- This isn't a Create
977 OR ?->>'inReplyTo' is null -- this isn't a reply
978 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
979 -- unless they are the author (because authors
980 -- are also part of the recipients). This leads
981 -- to a bug that self-replies by friends won't
982 -- show up.
983 OR ? = ? -- The actor is us
984 """,
985 activity.data,
986 object.data,
987 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
988 activity.recipients,
989 activity.actor,
990 activity.actor,
991 ^user.ap_id
992 )
993 )
994 end
995
996 defp restrict_replies(query, _), do: query
997
998 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
999 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1000 end
1001
1002 defp restrict_reblogs(query, _), do: query
1003
1004 defp restrict_muted(query, %{with_muted: true}), do: query
1005
1006 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1007 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1008
1009 query =
1010 from([activity] in query,
1011 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1012 where:
1013 fragment(
1014 "not (?->'to' \\?| ?) or ? = ?",
1015 activity.data,
1016 ^mutes,
1017 activity.actor,
1018 ^user.ap_id
1019 )
1020 )
1021
1022 unless opts[:skip_preload] do
1023 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1024 else
1025 query
1026 end
1027 end
1028
1029 defp restrict_muted(query, _), do: query
1030
1031 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1032 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1033 domain_blocks = user.domain_blocks || []
1034
1035 following_ap_ids = User.get_friends_ap_ids(user)
1036
1037 query =
1038 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1039
1040 from(
1041 [activity, object: o] in query,
1042 # You don't block the author
1043 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1044
1045 # You don't block any recipients, and didn't author the post
1046 where:
1047 fragment(
1048 "((not (? && ?)) or ? = ?)",
1049 activity.recipients,
1050 ^blocked_ap_ids,
1051 activity.actor,
1052 ^user.ap_id
1053 ),
1054
1055 # You don't block the domain of any recipients, and didn't author the post
1056 where:
1057 fragment(
1058 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1059 activity.recipients,
1060 ^domain_blocks,
1061 activity.actor,
1062 ^user.ap_id
1063 ),
1064
1065 # It's not a boost of a user you block
1066 where:
1067 fragment(
1068 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1069 activity.data,
1070 activity.data,
1071 ^blocked_ap_ids
1072 ),
1073
1074 # You don't block the author's domain, and also don't follow the author
1075 where:
1076 fragment(
1077 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1078 activity.actor,
1079 ^domain_blocks,
1080 activity.actor,
1081 ^following_ap_ids
1082 ),
1083
1084 # Same as above, but checks the Object
1085 where:
1086 fragment(
1087 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1088 o.data,
1089 ^domain_blocks,
1090 o.data,
1091 ^following_ap_ids
1092 )
1093 )
1094 end
1095
1096 defp restrict_blocked(query, _), do: query
1097
1098 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1099 if Config.get([:activitypub, :blockers_visible]) == true do
1100 query
1101 else
1102 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1103
1104 from(
1105 activity in query,
1106 # The author doesn't block you
1107 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1108
1109 # It's not a boost of a user that blocks you
1110 where:
1111 fragment(
1112 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1113 activity.data,
1114 activity.data,
1115 ^blocker_ap_ids
1116 )
1117 )
1118 end
1119 end
1120
1121 defp restrict_blockers_visibility(query, _), do: query
1122
1123 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1124 from(
1125 activity in query,
1126 where:
1127 fragment(
1128 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1129 activity.data,
1130 ^[Constants.as_public()]
1131 )
1132 )
1133 end
1134
1135 defp restrict_unlisted(query, _), do: query
1136
1137 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1138 from(
1139 [activity, object: o] in query,
1140 where:
1141 fragment(
1142 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1143 activity.data,
1144 activity.data,
1145 activity.data,
1146 ^ids
1147 )
1148 )
1149 end
1150
1151 defp restrict_pinned(query, _), do: query
1152
1153 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1154 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1155
1156 from(
1157 activity in query,
1158 where:
1159 fragment(
1160 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1161 activity.data,
1162 activity.actor,
1163 ^muted_reblogs
1164 )
1165 )
1166 end
1167
1168 defp restrict_muted_reblogs(query, _), do: query
1169
1170 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1171 from(
1172 activity in query,
1173 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1174 )
1175 end
1176
1177 defp restrict_instance(query, _), do: query
1178
1179 defp restrict_filtered(query, %{user: %User{} = user}) do
1180 case Filter.compose_regex(user) do
1181 nil ->
1182 query
1183
1184 regex ->
1185 from([activity, object] in query,
1186 where:
1187 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1188 activity.actor == ^user.ap_id
1189 )
1190 end
1191 end
1192
1193 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1194 restrict_filtered(query, %{user: user})
1195 end
1196
1197 defp restrict_filtered(query, _), do: query
1198
1199 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1200
1201 defp exclude_poll_votes(query, _) do
1202 if has_named_binding?(query, :object) do
1203 from([activity, object: o] in query,
1204 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1205 )
1206 else
1207 query
1208 end
1209 end
1210
1211 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1212
1213 defp exclude_chat_messages(query, _) do
1214 if has_named_binding?(query, :object) do
1215 from([activity, object: o] in query,
1216 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1217 )
1218 else
1219 query
1220 end
1221 end
1222
1223 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1224
1225 defp exclude_invisible_actors(query, _opts) do
1226 invisible_ap_ids =
1227 User.Query.build(%{invisible: true, select: [:ap_id]})
1228 |> Repo.all()
1229 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1230
1231 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1232 end
1233
1234 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1235 from(activity in query, where: activity.id != ^id)
1236 end
1237
1238 defp exclude_id(query, _), do: query
1239
1240 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1241
1242 defp maybe_preload_objects(query, _) do
1243 query
1244 |> Activity.with_preloaded_object()
1245 end
1246
1247 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1248
1249 defp maybe_preload_bookmarks(query, opts) do
1250 query
1251 |> Activity.with_preloaded_bookmark(opts[:user])
1252 end
1253
1254 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1255 query
1256 |> Activity.with_preloaded_report_notes()
1257 end
1258
1259 defp maybe_preload_report_notes(query, _), do: query
1260
1261 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1262
1263 defp maybe_set_thread_muted_field(query, opts) do
1264 query
1265 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1266 end
1267
1268 defp maybe_order(query, %{order: :desc}) do
1269 query
1270 |> order_by(desc: :id)
1271 end
1272
1273 defp maybe_order(query, %{order: :asc}) do
1274 query
1275 |> order_by(asc: :id)
1276 end
1277
1278 defp maybe_order(query, _), do: query
1279
1280 defp normalize_fetch_activities_query_opts(opts) do
1281 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1282 case opts[key] do
1283 value when is_bitstring(value) ->
1284 Map.put(opts, key, Hashtag.normalize_name(value))
1285
1286 value when is_list(value) ->
1287 normalized_value =
1288 value
1289 |> Enum.map(&Hashtag.normalize_name/1)
1290 |> Enum.uniq()
1291
1292 Map.put(opts, key, normalized_value)
1293
1294 _ ->
1295 opts
1296 end
1297 end)
1298 end
1299
1300 defp fetch_activities_query_ap_ids_ops(opts) do
1301 source_user = opts[:muting_user]
1302 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1303
1304 ap_id_relationships =
1305 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1306 [:block | ap_id_relationships]
1307 else
1308 ap_id_relationships
1309 end
1310
1311 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1312
1313 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1314 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1315
1316 restrict_muted_reblogs_opts =
1317 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1318
1319 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1320 end
1321
1322 def fetch_activities_query(recipients, opts \\ %{}) do
1323 opts = normalize_fetch_activities_query_opts(opts)
1324
1325 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1326 fetch_activities_query_ap_ids_ops(opts)
1327
1328 config = %{
1329 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1330 }
1331
1332 query =
1333 Activity
1334 |> maybe_preload_objects(opts)
1335 |> maybe_preload_bookmarks(opts)
1336 |> maybe_preload_report_notes(opts)
1337 |> maybe_set_thread_muted_field(opts)
1338 |> maybe_order(opts)
1339 |> restrict_recipients(recipients, opts[:user])
1340 |> restrict_replies(opts)
1341 |> restrict_since(opts)
1342 |> restrict_local(opts)
1343 |> restrict_remote(opts)
1344 |> restrict_actor(opts)
1345 |> restrict_type(opts)
1346 |> restrict_state(opts)
1347 |> restrict_favorited_by(opts)
1348 |> restrict_blocked(restrict_blocked_opts)
1349 |> restrict_blockers_visibility(opts)
1350 |> restrict_muted(restrict_muted_opts)
1351 |> restrict_filtered(opts)
1352 |> restrict_media(opts)
1353 |> restrict_visibility(opts)
1354 |> restrict_thread_visibility(opts, config)
1355 |> restrict_reblogs(opts)
1356 |> restrict_pinned(opts)
1357 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1358 |> restrict_instance(opts)
1359 |> restrict_announce_object_actor(opts)
1360 |> restrict_filtered(opts)
1361 |> Activity.restrict_deactivated_users()
1362 |> exclude_poll_votes(opts)
1363 |> exclude_chat_messages(opts)
1364 |> exclude_invisible_actors(opts)
1365 |> exclude_visibility(opts)
1366
1367 if Config.feature_enabled?(:improved_hashtag_timeline) do
1368 query
1369 |> restrict_hashtag_any(opts)
1370 |> restrict_hashtag_all(opts)
1371 |> restrict_hashtag_reject_any(opts)
1372 else
1373 query
1374 |> restrict_embedded_tag_any(opts)
1375 |> restrict_embedded_tag_all(opts)
1376 |> restrict_embedded_tag_reject_any(opts)
1377 end
1378 end
1379
1380 @doc """
1381 Fetch favorites activities of user with order by sort adds to favorites
1382 """
1383 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1384 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1385 user.ap_id
1386 |> Activity.Queries.by_actor()
1387 |> Activity.Queries.by_type("Like")
1388 |> Activity.with_joined_object()
1389 |> Object.with_joined_activity()
1390 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1391 |> order_by([like, _, _], desc_nulls_last: like.id)
1392 |> Pagination.fetch_paginated(
1393 Map.merge(params, %{skip_order: true}),
1394 pagination
1395 )
1396 end
1397
1398 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1399 Enum.map(activities, fn
1400 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1401 if Enum.any?(bcc, &(&1 in list_memberships)) do
1402 update_in(activity.data["cc"], &[user_ap_id | &1])
1403 else
1404 activity
1405 end
1406
1407 activity ->
1408 activity
1409 end)
1410 end
1411
1412 defp maybe_update_cc(activities, _, _), do: activities
1413
1414 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1415 from(activity in query,
1416 where:
1417 fragment("? && ?", activity.recipients, ^recipients) or
1418 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1419 ^Constants.as_public() in activity.recipients)
1420 )
1421 end
1422
1423 def fetch_activities_bounded(
1424 recipients,
1425 recipients_with_public,
1426 opts \\ %{},
1427 pagination \\ :keyset
1428 ) do
1429 fetch_activities_query([], opts)
1430 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1431 |> Pagination.fetch_paginated(opts, pagination)
1432 |> Enum.reverse()
1433 end
1434
1435 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1436 def upload(file, opts \\ []) do
1437 with {:ok, data} <- Upload.store(file, opts) do
1438 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1439
1440 Repo.insert(%Object{data: obj_data})
1441 end
1442 end
1443
1444 @spec get_actor_url(any()) :: binary() | nil
1445 defp get_actor_url(url) when is_binary(url), do: url
1446 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1447
1448 defp get_actor_url(url) when is_list(url) do
1449 url
1450 |> List.first()
1451 |> get_actor_url()
1452 end
1453
1454 defp get_actor_url(_url), do: nil
1455
1456 defp normalize_image(%{"url" => url}) do
1457 %{
1458 "type" => "Image",
1459 "url" => [%{"href" => url}]
1460 }
1461 end
1462
1463 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1464 defp normalize_image(_), do: nil
1465
1466 defp object_to_user_data(data) do
1467 fields =
1468 data
1469 |> Map.get("attachment", [])
1470 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1471 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1472
1473 emojis =
1474 data
1475 |> Map.get("tag", [])
1476 |> Enum.filter(fn
1477 %{"type" => "Emoji"} -> true
1478 _ -> false
1479 end)
1480 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1481 {String.trim(name, ":"), url}
1482 end)
1483
1484 is_locked = data["manuallyApprovesFollowers"] || false
1485 capabilities = data["capabilities"] || %{}
1486 accepts_chat_messages = capabilities["acceptsChatMessages"]
1487 data = Transmogrifier.maybe_fix_user_object(data)
1488 is_discoverable = data["discoverable"] || false
1489 invisible = data["invisible"] || false
1490 actor_type = data["type"] || "Person"
1491
1492 featured_address = data["featured"]
1493 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1494
1495 public_key =
1496 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1497 data["publicKey"]["publicKeyPem"]
1498 else
1499 nil
1500 end
1501
1502 shared_inbox =
1503 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1504 data["endpoints"]["sharedInbox"]
1505 else
1506 nil
1507 end
1508
1509 user_data = %{
1510 ap_id: data["id"],
1511 uri: get_actor_url(data["url"]),
1512 ap_enabled: true,
1513 banner: normalize_image(data["image"]),
1514 fields: fields,
1515 emoji: emojis,
1516 is_locked: is_locked,
1517 is_discoverable: is_discoverable,
1518 invisible: invisible,
1519 avatar: normalize_image(data["icon"]),
1520 name: data["name"],
1521 follower_address: data["followers"],
1522 following_address: data["following"],
1523 featured_address: featured_address,
1524 bio: data["summary"] || "",
1525 actor_type: actor_type,
1526 also_known_as: Map.get(data, "alsoKnownAs", []),
1527 public_key: public_key,
1528 inbox: data["inbox"],
1529 shared_inbox: shared_inbox,
1530 accepts_chat_messages: accepts_chat_messages,
1531 pinned_objects: pinned_objects
1532 }
1533
1534 # nickname can be nil because of virtual actors
1535 if data["preferredUsername"] do
1536 Map.put(
1537 user_data,
1538 :nickname,
1539 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1540 )
1541 else
1542 Map.put(user_data, :nickname, nil)
1543 end
1544 end
1545
1546 def fetch_follow_information_for_user(user) do
1547 with {:ok, following_data} <-
1548 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1549 {:ok, hide_follows} <- collection_private(following_data),
1550 {:ok, followers_data} <-
1551 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1552 {:ok, hide_followers} <- collection_private(followers_data) do
1553 {:ok,
1554 %{
1555 hide_follows: hide_follows,
1556 follower_count: normalize_counter(followers_data["totalItems"]),
1557 following_count: normalize_counter(following_data["totalItems"]),
1558 hide_followers: hide_followers
1559 }}
1560 else
1561 {:error, _} = e -> e
1562 e -> {:error, e}
1563 end
1564 end
1565
1566 defp normalize_counter(counter) when is_integer(counter), do: counter
1567 defp normalize_counter(_), do: 0
1568
1569 def maybe_update_follow_information(user_data) do
1570 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1571 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1572 {_, true} <-
1573 {:collections_available,
1574 !!(user_data[:following_address] && user_data[:follower_address])},
1575 {:ok, info} <-
1576 fetch_follow_information_for_user(user_data) do
1577 info = Map.merge(user_data[:info] || %{}, info)
1578
1579 user_data
1580 |> Map.put(:info, info)
1581 else
1582 {:user_type_check, false} ->
1583 user_data
1584
1585 {:collections_available, false} ->
1586 user_data
1587
1588 {:enabled, false} ->
1589 user_data
1590
1591 e ->
1592 Logger.error(
1593 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1594 )
1595
1596 user_data
1597 end
1598 end
1599
1600 defp collection_private(%{"first" => %{"type" => type}})
1601 when type in ["CollectionPage", "OrderedCollectionPage"],
1602 do: {:ok, false}
1603
1604 defp collection_private(%{"first" => first}) do
1605 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1606 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1607 {:ok, false}
1608 else
1609 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1610 {:error, _} = e -> e
1611 e -> {:error, e}
1612 end
1613 end
1614
1615 defp collection_private(_data), do: {:ok, true}
1616
1617 def user_data_from_user_object(data) do
1618 with {:ok, data} <- MRF.filter(data) do
1619 {:ok, object_to_user_data(data)}
1620 else
1621 e -> {:error, e}
1622 end
1623 end
1624
1625 def fetch_and_prepare_user_from_ap_id(ap_id) do
1626 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1627 {:ok, data} <- user_data_from_user_object(data) do
1628 {:ok, maybe_update_follow_information(data)}
1629 else
1630 # If this has been deleted, only log a debug and not an error
1631 {:error, "Object has been deleted" = e} ->
1632 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1633 {:error, e}
1634
1635 {:error, {:reject, reason} = e} ->
1636 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1637 {:error, e}
1638
1639 {:error, e} ->
1640 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1641 {:error, e}
1642 end
1643 end
1644
1645 def maybe_handle_clashing_nickname(data) do
1646 with nickname when is_binary(nickname) <- data[:nickname],
1647 %User{} = old_user <- User.get_by_nickname(nickname),
1648 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1649 Logger.info(
1650 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1651 )
1652
1653 old_user
1654 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1655 |> User.update_and_set_cache()
1656 else
1657 {:ap_id_comparison, true} ->
1658 Logger.info(
1659 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1660 )
1661
1662 _ ->
1663 nil
1664 end
1665 end
1666
1667 def pin_data_from_featured_collection(%{
1668 "type" => "OrderedCollection",
1669 "first" => first
1670 }) do
1671 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1672 page
1673 |> Map.get("orderedItems")
1674 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1675 else
1676 e ->
1677 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1678 {:ok, %{}}
1679 end
1680 end
1681
1682 def pin_data_from_featured_collection(
1683 %{
1684 "type" => type
1685 } = collection
1686 )
1687 when type in ["OrderedCollection", "Collection"] do
1688 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1689 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1690 end
1691
1692 def fetch_and_prepare_featured_from_ap_id(nil) do
1693 {:ok, %{}}
1694 end
1695
1696 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1697 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1698 {:ok, pin_data_from_featured_collection(data)}
1699 else
1700 e ->
1701 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1702 {:ok, %{}}
1703 end
1704 end
1705
1706 def pinned_fetch_task(nil), do: nil
1707
1708 def pinned_fetch_task(%{pinned_objects: pins}) do
1709 if Enum.all?(pins, fn {ap_id, _} ->
1710 Object.get_cached_by_ap_id(ap_id) ||
1711 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1712 end) do
1713 :ok
1714 else
1715 :error
1716 end
1717 end
1718
1719 def make_user_from_ap_id(ap_id) do
1720 user = User.get_cached_by_ap_id(ap_id)
1721
1722 if user && !User.ap_enabled?(user) do
1723 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1724 else
1725 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1726 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1727
1728 if user do
1729 user
1730 |> User.remote_user_changeset(data)
1731 |> User.update_and_set_cache()
1732 else
1733 maybe_handle_clashing_nickname(data)
1734
1735 data
1736 |> User.remote_user_changeset()
1737 |> Repo.insert()
1738 |> User.set_cache()
1739 end
1740 end
1741 end
1742 end
1743
1744 def make_user_from_nickname(nickname) do
1745 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1746 make_user_from_ap_id(ap_id)
1747 else
1748 _e -> {:error, "No AP id in WebFinger"}
1749 end
1750 end
1751
1752 # filter out broken threads
1753 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1754 entire_thread_visible_for_user?(activity, user)
1755 end
1756
1757 # do post-processing on a specific activity
1758 def contain_activity(%Activity{} = activity, %User{} = user) do
1759 contain_broken_threads(activity, user)
1760 end
1761
1762 def fetch_direct_messages_query do
1763 Activity
1764 |> restrict_type(%{type: "Create"})
1765 |> restrict_visibility(%{visibility: "direct"})
1766 |> order_by([activity], asc: activity.id)
1767 end
1768 end