7560969525f041506c629c488a700aa92cee0d92
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
29
30 import Ecto.Query
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
33
34 require Logger
35 require Pleroma.Constants
36
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
54 {recipients, to, cc}
55 end
56
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
63 _ -> false
64 end
65 end
66
67 defp check_actor_can_insert(_), do: true
68
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
72 end
73
74 defp check_remote_limit(_), do: true
75
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78 end
79
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82 end
83
84 def update_last_status_at_if_public(actor, object) do
85 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
86 end
87
88 defp increase_replies_count_if_reply(%{
89 "object" => %{"inReplyTo" => reply_ap_id} = object,
90 "type" => "Create"
91 }) do
92 if is_public?(object) do
93 Object.increase_replies_count(reply_ap_id)
94 end
95 end
96
97 defp increase_replies_count_if_reply(_create_data), do: :noop
98
99 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
100 @impl true
101 def persist(%{"type" => type} = object, meta) when type in @object_types do
102 with {:ok, object} <- Object.create(object) do
103 {:ok, object, meta}
104 end
105 end
106
107 @impl true
108 def persist(object, meta) do
109 with local <- Keyword.fetch!(meta, :local),
110 {recipients, _, _} <- get_recipients(object),
111 {:ok, activity} <-
112 Repo.insert(%Activity{
113 data: object,
114 local: local,
115 recipients: recipients,
116 actor: object["actor"]
117 }),
118 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
119 {:ok, _} <- maybe_create_activity_expiration(activity) do
120 {:ok, activity, meta}
121 end
122 end
123
124 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
125 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
129 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
134 {:ok, map, object} <- insert_full_object(map),
135 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
136 # Splice in the child object if we have one.
137 activity = Maps.put_if_present(activity, :object, object)
138
139 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
140 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
141 end)
142
143 {:ok, activity}
144 else
145 %Activity{} = activity ->
146 {:ok, activity}
147
148 {:actor_check, _} ->
149 {:error, false}
150
151 {:containment, _} = error ->
152 error
153
154 {:error, _} = error ->
155 error
156
157 {:fake, true, map, recipients} ->
158 activity = %Activity{
159 data: map,
160 local: local,
161 actor: map["actor"],
162 recipients: recipients,
163 id: "pleroma:fakeid"
164 }
165
166 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
167 {:ok, activity}
168
169 {:remote_limit_pass, _} ->
170 {:error, :remote_limit}
171
172 {:reject, _} = e ->
173 {:error, e}
174 end
175 end
176
177 defp insert_activity_with_expiration(data, local, recipients) do
178 struct = %Activity{
179 data: data,
180 local: local,
181 actor: data["actor"],
182 recipients: recipients
183 }
184
185 with {:ok, activity} <- Repo.insert(struct) do
186 maybe_create_activity_expiration(activity)
187 end
188 end
189
190 def notify_and_stream(activity) do
191 Notification.create_notifications(activity)
192
193 conversation = create_or_bump_conversation(activity, activity.actor)
194 participations = get_participations(conversation)
195 stream_out(activity)
196 stream_out_participations(participations)
197 end
198
199 defp maybe_create_activity_expiration(
200 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
201 ) do
202 with {:ok, _job} <-
203 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
204 activity_id: activity.id,
205 expires_at: expires_at
206 }) do
207 {:ok, activity}
208 end
209 end
210
211 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
212
213 defp create_or_bump_conversation(activity, actor) do
214 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
215 %User{} = user <- User.get_cached_by_ap_id(actor) do
216 Participation.mark_as_read(user, conversation)
217 {:ok, conversation}
218 end
219 end
220
221 defp get_participations({:ok, conversation}) do
222 conversation
223 |> Repo.preload(:participations, force: true)
224 |> Map.get(:participations)
225 end
226
227 defp get_participations(_), do: []
228
229 def stream_out_participations(participations) do
230 participations =
231 participations
232 |> Repo.preload(:user)
233
234 Streamer.stream("participation", participations)
235 end
236
237 @impl true
238 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
239 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
240 conversation = Repo.preload(conversation, :participations)
241
242 last_activity_id =
243 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
244 user: user,
245 blocking_user: user
246 })
247
248 if last_activity_id do
249 stream_out_participations(conversation.participations)
250 end
251 end
252 end
253
254 @impl true
255 def stream_out_participations(_, _), do: :noop
256
257 @impl true
258 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
259 when data_type in ["Create", "Announce", "Delete"] do
260 activity
261 |> Topics.get_activity_topics()
262 |> Streamer.stream(activity)
263 end
264
265 @impl true
266 def stream_out(_activity) do
267 :noop
268 end
269
270 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
271 def create(params, fake \\ false) do
272 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
273 result
274 end
275 end
276
277 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
278 additional = params[:additional] || %{}
279 # only accept false as false value
280 local = !(params[:local] == false)
281 published = params[:published]
282 quick_insert? = Config.get([:env]) == :benchmark
283
284 create_data =
285 make_create_data(
286 %{to: to, actor: actor, published: published, context: context, object: object},
287 additional
288 )
289
290 with {:ok, activity} <- insert(create_data, local, fake),
291 {:fake, false, activity} <- {:fake, fake, activity},
292 _ <- increase_replies_count_if_reply(create_data),
293 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
294 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
295 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
296 _ <- notify_and_stream(activity),
297 :ok <- maybe_schedule_poll_notifications(activity),
298 :ok <- maybe_federate(activity) do
299 {:ok, activity}
300 else
301 {:quick_insert, true, activity} ->
302 {:ok, activity}
303
304 {:fake, true, activity} ->
305 {:ok, activity}
306
307 {:error, message} ->
308 Repo.rollback(message)
309 end
310 end
311
312 defp maybe_schedule_poll_notifications(activity) do
313 PollWorker.schedule_poll_end(activity)
314 :ok
315 end
316
317 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
318 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
319 additional = params[:additional] || %{}
320 # only accept false as false value
321 local = !(params[:local] == false)
322 published = params[:published]
323
324 listen_data =
325 make_listen_data(
326 %{to: to, actor: actor, published: published, context: context, object: object},
327 additional
328 )
329
330 with {:ok, activity} <- insert(listen_data, local),
331 _ <- notify_and_stream(activity),
332 :ok <- maybe_federate(activity) do
333 {:ok, activity}
334 end
335 end
336
337 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
338 {:ok, Activity.t()} | nil | {:error, any()}
339 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
340 with {:ok, result} <-
341 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
342 result
343 end
344 end
345
346 defp do_unfollow(follower, followed, activity_id, local) do
347 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
348 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
349 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
350 {:ok, activity} <- insert(unfollow_data, local),
351 _ <- notify_and_stream(activity),
352 :ok <- maybe_federate(activity) do
353 {:ok, activity}
354 else
355 nil -> nil
356 {:error, error} -> Repo.rollback(error)
357 end
358 end
359
360 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
361 def flag(params) do
362 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
363 result
364 end
365 end
366
367 defp do_flag(
368 %{
369 actor: actor,
370 context: _context,
371 account: account,
372 statuses: statuses,
373 content: content
374 } = params
375 ) do
376 # only accept false as false value
377 local = !(params[:local] == false)
378 forward = !(params[:forward] == false)
379
380 additional = params[:additional] || %{}
381
382 additional =
383 if forward do
384 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
385 else
386 Map.merge(additional, %{"to" => [], "cc" => []})
387 end
388
389 with flag_data <- make_flag_data(params, additional),
390 {:ok, activity} <- insert(flag_data, local),
391 {:ok, stripped_activity} <- strip_report_status_data(activity),
392 _ <- notify_and_stream(activity),
393 :ok <-
394 maybe_federate(stripped_activity) do
395 User.all_superusers()
396 |> Enum.filter(fn user -> user.ap_id != actor end)
397 |> Enum.filter(fn user -> not is_nil(user.email) end)
398 |> Enum.each(fn superuser ->
399 superuser
400 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
401 |> Pleroma.Emails.Mailer.deliver_async()
402 end)
403
404 {:ok, activity}
405 else
406 {:error, error} -> Repo.rollback(error)
407 end
408 end
409
410 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
411 def move(%User{} = origin, %User{} = target, local \\ true) do
412 params = %{
413 "type" => "Move",
414 "actor" => origin.ap_id,
415 "object" => origin.ap_id,
416 "target" => target.ap_id
417 }
418
419 with true <- origin.ap_id in target.also_known_as,
420 {:ok, activity} <- insert(params, local),
421 _ <- notify_and_stream(activity) do
422 maybe_federate(activity)
423
424 BackgroundWorker.enqueue("move_following", %{
425 "origin_id" => origin.id,
426 "target_id" => target.id
427 })
428
429 {:ok, activity}
430 else
431 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
432 err -> err
433 end
434 end
435
436 def fetch_activities_for_context_query(context, opts) do
437 public = [Constants.as_public()]
438
439 recipients =
440 if opts[:user],
441 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
442 else: public
443
444 from(activity in Activity)
445 |> maybe_preload_objects(opts)
446 |> maybe_preload_bookmarks(opts)
447 |> maybe_set_thread_muted_field(opts)
448 |> restrict_blocked(opts)
449 |> restrict_blockers_visibility(opts)
450 |> restrict_recipients(recipients, opts[:user])
451 |> restrict_filtered(opts)
452 |> where(
453 [activity],
454 fragment(
455 "?->>'type' = ? and ?->>'context' = ?",
456 activity.data,
457 "Create",
458 activity.data,
459 ^context
460 )
461 )
462 |> exclude_poll_votes(opts)
463 |> exclude_id(opts)
464 |> order_by([activity], desc: activity.id)
465 end
466
467 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
468 def fetch_activities_for_context(context, opts \\ %{}) do
469 context
470 |> fetch_activities_for_context_query(opts)
471 |> Repo.all()
472 end
473
474 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
475 FlakeId.Ecto.CompatType.t() | nil
476 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
477 context
478 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
479 |> restrict_visibility(%{visibility: "direct"})
480 |> limit(1)
481 |> select([a], a.id)
482 |> Repo.one()
483 end
484
485 defp fetch_paginated_optimized(query, opts, pagination) do
486 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
487 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
488 opts = Map.put(opts, :skip_extra_order, true)
489
490 Pagination.fetch_paginated(query, opts, pagination)
491 end
492
493 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
494 list_memberships = Pleroma.List.memberships(opts[:user])
495
496 fetch_activities_query(recipients ++ list_memberships, opts)
497 |> fetch_paginated_optimized(opts, pagination)
498 |> Enum.reverse()
499 |> maybe_update_cc(list_memberships, opts[:user])
500 end
501
502 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
503 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
504 opts = Map.delete(opts, :user)
505
506 [Constants.as_public()]
507 |> fetch_activities_query(opts)
508 |> restrict_unlisted(opts)
509 |> fetch_paginated_optimized(opts, pagination)
510 end
511
512 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
513 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
514 opts
515 |> Map.put(:restrict_unlisted, true)
516 |> fetch_public_or_unlisted_activities(pagination)
517 end
518
519 @valid_visibilities ~w[direct unlisted public private]
520
521 defp restrict_visibility(query, %{visibility: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not restrict visibility to #{visibility}")
537 end
538 end
539
540 defp restrict_visibility(query, %{visibility: visibility})
541 when visibility in @valid_visibilities do
542 from(
543 a in query,
544 where:
545 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
546 )
547 end
548
549 defp restrict_visibility(_query, %{visibility: visibility})
550 when visibility not in @valid_visibilities do
551 Logger.error("Could not restrict visibility to #{visibility}")
552 end
553
554 defp restrict_visibility(query, _visibility), do: query
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when is_list(visibility) do
558 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
559 from(
560 a in query,
561 where:
562 not fragment(
563 "activity_visibility(?, ?, ?) = ANY (?)",
564 a.actor,
565 a.recipients,
566 a.data,
567 ^visibility
568 )
569 )
570 else
571 Logger.error("Could not exclude visibility to #{visibility}")
572 query
573 end
574 end
575
576 defp exclude_visibility(query, %{exclude_visibilities: visibility})
577 when visibility in @valid_visibilities do
578 from(
579 a in query,
580 where:
581 not fragment(
582 "activity_visibility(?, ?, ?) = ?",
583 a.actor,
584 a.recipients,
585 a.data,
586 ^visibility
587 )
588 )
589 end
590
591 defp exclude_visibility(query, %{exclude_visibilities: visibility})
592 when visibility not in [nil | @valid_visibilities] do
593 Logger.error("Could not exclude visibility to #{visibility}")
594 query
595 end
596
597 defp exclude_visibility(query, _visibility), do: query
598
599 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
600 do: query
601
602 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
603 do: query
604
605 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
606 from(
607 a in query,
608 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
609 )
610 end
611
612 defp restrict_thread_visibility(query, _, _), do: query
613
614 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
615 params =
616 params
617 |> Map.put(:user, reading_user)
618 |> Map.put(:actor_id, user.ap_id)
619
620 %{
621 godmode: params[:godmode],
622 reading_user: reading_user
623 }
624 |> user_activities_recipients()
625 |> fetch_activities(params)
626 |> Enum.reverse()
627 end
628
629 def fetch_user_activities(user, reading_user, params \\ %{})
630
631 def fetch_user_activities(user, reading_user, %{total: true} = params) do
632 result = fetch_activities_for_user(user, reading_user, params)
633
634 Keyword.put(result, :items, Enum.reverse(result[:items]))
635 end
636
637 def fetch_user_activities(user, reading_user, params) do
638 user
639 |> fetch_activities_for_user(reading_user, params)
640 |> Enum.reverse()
641 end
642
643 defp fetch_activities_for_user(user, reading_user, params) do
644 params =
645 params
646 |> Map.put(:type, ["Create", "Announce"])
647 |> Map.put(:user, reading_user)
648 |> Map.put(:actor_id, user.ap_id)
649 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
650
651 params =
652 if User.blocks?(reading_user, user) do
653 params
654 else
655 params
656 |> Map.put(:blocking_user, reading_user)
657 |> Map.put(:muting_user, reading_user)
658 end
659
660 pagination_type = Map.get(params, :pagination_type) || :keyset
661
662 %{
663 godmode: params[:godmode],
664 reading_user: reading_user
665 }
666 |> user_activities_recipients()
667 |> fetch_activities(params, pagination_type)
668 end
669
670 def fetch_statuses(reading_user, %{total: true} = params) do
671 result = fetch_activities_for_reading_user(reading_user, params)
672 Keyword.put(result, :items, Enum.reverse(result[:items]))
673 end
674
675 def fetch_statuses(reading_user, params) do
676 reading_user
677 |> fetch_activities_for_reading_user(params)
678 |> Enum.reverse()
679 end
680
681 defp fetch_activities_for_reading_user(reading_user, params) do
682 params = Map.put(params, :type, ["Create", "Announce"])
683
684 %{
685 godmode: params[:godmode],
686 reading_user: reading_user
687 }
688 |> user_activities_recipients()
689 |> fetch_activities(params, :offset)
690 end
691
692 defp user_activities_recipients(%{godmode: true}), do: []
693
694 defp user_activities_recipients(%{reading_user: reading_user}) do
695 if reading_user do
696 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
697 else
698 [Constants.as_public()]
699 end
700 end
701
702 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
703 raise "Can't use the child object without preloading!"
704 end
705
706 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
707 from(
708 [activity, object] in query,
709 where:
710 fragment(
711 "?->>'type' != ? or ?->>'actor' != ?",
712 activity.data,
713 "Announce",
714 object.data,
715 ^actor
716 )
717 )
718 end
719
720 defp restrict_announce_object_actor(query, _), do: query
721
722 defp restrict_since(query, %{since_id: ""}), do: query
723
724 defp restrict_since(query, %{since_id: since_id}) do
725 from(activity in query, where: activity.id > ^since_id)
726 end
727
728 defp restrict_since(query, _), do: query
729
730 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
731 raise_on_missing_preload()
732 end
733
734 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
735 from(
736 [_activity, object] in query,
737 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
738 )
739 end
740
741 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
742 restrict_embedded_tag_any(query, %{tag: tag})
743 end
744
745 defp restrict_embedded_tag_all(query, _), do: query
746
747 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
748 raise_on_missing_preload()
749 end
750
751 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
752 from(
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
755 )
756 end
757
758 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
759 restrict_embedded_tag_any(query, %{tag: [tag]})
760 end
761
762 defp restrict_embedded_tag_any(query, _), do: query
763
764 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
765 raise_on_missing_preload()
766 end
767
768 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
769 from(
770 [_activity, object] in query,
771 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
772 )
773 end
774
775 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
776 when is_binary(tag_reject) do
777 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
778 end
779
780 defp restrict_embedded_tag_reject_any(query, _), do: query
781
782 defp object_ids_query_for_tags(tags) do
783 from(hto in "hashtags_objects")
784 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
785 |> where([hto, ht], ht.name in ^tags)
786 |> select([hto], hto.object_id)
787 |> distinct([hto], true)
788 end
789
790 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
791 raise_on_missing_preload()
792 end
793
794 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
795 restrict_hashtag_any(query, %{tag: single_tag})
796 end
797
798 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
799 from(
800 [_activity, object] in query,
801 where:
802 fragment(
803 """
804 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
805 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
806 AND hashtags_objects.object_id = ?) @> ?
807 """,
808 ^tags,
809 object.id,
810 ^tags
811 )
812 )
813 end
814
815 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
816 restrict_hashtag_all(query, %{tag_all: [tag]})
817 end
818
819 defp restrict_hashtag_all(query, _), do: query
820
821 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
822 raise_on_missing_preload()
823 end
824
825 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
826 hashtag_ids =
827 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
828 |> Repo.all()
829
830 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
831 from(
832 [_activity, object] in query,
833 join: hto in "hashtags_objects",
834 on: hto.object_id == object.id,
835 where: hto.hashtag_id in ^hashtag_ids,
836 distinct: [desc: object.id],
837 order_by: [desc: object.id]
838 )
839 end
840
841 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
842 restrict_hashtag_any(query, %{tag: [tag]})
843 end
844
845 defp restrict_hashtag_any(query, _), do: query
846
847 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
848 raise_on_missing_preload()
849 end
850
851 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
852 from(
853 [_activity, object] in query,
854 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
855 )
856 end
857
858 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
859 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
860 end
861
862 defp restrict_hashtag_reject_any(query, _), do: query
863
864 defp raise_on_missing_preload do
865 raise "Can't use the child object without preloading!"
866 end
867
868 defp restrict_recipients(query, [], _user), do: query
869
870 defp restrict_recipients(query, recipients, nil) do
871 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
872 end
873
874 defp restrict_recipients(query, recipients, user) do
875 from(
876 activity in query,
877 where: fragment("? && ?", ^recipients, activity.recipients),
878 or_where: activity.actor == ^user.ap_id
879 )
880 end
881
882 defp restrict_local(query, %{local_only: true}) do
883 from(activity in query, where: activity.local == true)
884 end
885
886 defp restrict_local(query, _), do: query
887
888 defp restrict_remote(query, %{remote: true}) do
889 from(activity in query, where: activity.local == false)
890 end
891
892 defp restrict_remote(query, _), do: query
893
894 defp restrict_actor(query, %{actor_id: actor_id}) do
895 from(activity in query, where: activity.actor == ^actor_id)
896 end
897
898 defp restrict_actor(query, _), do: query
899
900 defp restrict_type(query, %{type: type}) when is_binary(type) do
901 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
902 end
903
904 defp restrict_type(query, %{type: type}) do
905 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
906 end
907
908 defp restrict_type(query, _), do: query
909
910 defp restrict_state(query, %{state: state}) do
911 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
912 end
913
914 defp restrict_state(query, _), do: query
915
916 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
917 from(
918 [_activity, object] in query,
919 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
920 )
921 end
922
923 defp restrict_favorited_by(query, _), do: query
924
925 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
926 raise "Can't use the child object without preloading!"
927 end
928
929 defp restrict_media(query, %{only_media: true}) do
930 from(
931 [activity, object] in query,
932 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
933 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
934 )
935 end
936
937 defp restrict_media(query, _), do: query
938
939 defp restrict_replies(query, %{exclude_replies: true}) do
940 from(
941 [_activity, object] in query,
942 where: fragment("?->>'inReplyTo' is null", object.data)
943 )
944 end
945
946 defp restrict_replies(query, %{
947 reply_filtering_user: %User{} = user,
948 reply_visibility: "self"
949 }) do
950 from(
951 [activity, object] in query,
952 where:
953 fragment(
954 "?->>'inReplyTo' is null OR ? = ANY(?)",
955 object.data,
956 ^user.ap_id,
957 activity.recipients
958 )
959 )
960 end
961
962 defp restrict_replies(query, %{
963 reply_filtering_user: %User{} = user,
964 reply_visibility: "following"
965 }) do
966 from(
967 [activity, object] in query,
968 where:
969 fragment(
970 """
971 ?->>'type' != 'Create' -- This isn't a Create
972 OR ?->>'inReplyTo' is null -- this isn't a reply
973 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
974 -- unless they are the author (because authors
975 -- are also part of the recipients). This leads
976 -- to a bug that self-replies by friends won't
977 -- show up.
978 OR ? = ? -- The actor is us
979 """,
980 activity.data,
981 object.data,
982 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
983 activity.recipients,
984 activity.actor,
985 activity.actor,
986 ^user.ap_id
987 )
988 )
989 end
990
991 defp restrict_replies(query, _), do: query
992
993 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
994 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
995 end
996
997 defp restrict_reblogs(query, _), do: query
998
999 defp restrict_muted(query, %{with_muted: true}), do: query
1000
1001 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1002 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1003
1004 query =
1005 from([activity] in query,
1006 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1007 where:
1008 fragment(
1009 "not (?->'to' \\?| ?) or ? = ?",
1010 activity.data,
1011 ^mutes,
1012 activity.actor,
1013 ^user.ap_id
1014 )
1015 )
1016
1017 unless opts[:skip_preload] do
1018 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1019 else
1020 query
1021 end
1022 end
1023
1024 defp restrict_muted(query, _), do: query
1025
1026 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1027 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1028 domain_blocks = user.domain_blocks || []
1029
1030 following_ap_ids = User.get_friends_ap_ids(user)
1031
1032 query =
1033 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1034
1035 from(
1036 [activity, object: o] in query,
1037 # You don't block the author
1038 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1039
1040 # You don't block any recipients, and didn't author the post
1041 where:
1042 fragment(
1043 "((not (? && ?)) or ? = ?)",
1044 activity.recipients,
1045 ^blocked_ap_ids,
1046 activity.actor,
1047 ^user.ap_id
1048 ),
1049
1050 # You don't block the domain of any recipients, and didn't author the post
1051 where:
1052 fragment(
1053 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1054 activity.recipients,
1055 ^domain_blocks,
1056 activity.actor,
1057 ^user.ap_id
1058 ),
1059
1060 # It's not a boost of a user you block
1061 where:
1062 fragment(
1063 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1064 activity.data,
1065 activity.data,
1066 ^blocked_ap_ids
1067 ),
1068
1069 # You don't block the author's domain, and also don't follow the author
1070 where:
1071 fragment(
1072 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1073 activity.actor,
1074 ^domain_blocks,
1075 activity.actor,
1076 ^following_ap_ids
1077 ),
1078
1079 # Same as above, but checks the Object
1080 where:
1081 fragment(
1082 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1083 o.data,
1084 ^domain_blocks,
1085 o.data,
1086 ^following_ap_ids
1087 )
1088 )
1089 end
1090
1091 defp restrict_blocked(query, _), do: query
1092
1093 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1094 if Config.get([:activitypub, :blockers_visible]) == true do
1095 query
1096 else
1097 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1098
1099 from(
1100 activity in query,
1101 # The author doesn't block you
1102 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1103
1104 # It's not a boost of a user that blocks you
1105 where:
1106 fragment(
1107 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1108 activity.data,
1109 activity.data,
1110 ^blocker_ap_ids
1111 )
1112 )
1113 end
1114 end
1115
1116 defp restrict_blockers_visibility(query, _), do: query
1117
1118 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1119 from(
1120 activity in query,
1121 where:
1122 fragment(
1123 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1124 activity.data,
1125 ^[Constants.as_public()]
1126 )
1127 )
1128 end
1129
1130 defp restrict_unlisted(query, _), do: query
1131
1132 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1133 from(
1134 [activity, object: o] in query,
1135 where:
1136 fragment(
1137 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1138 activity.data,
1139 activity.data,
1140 activity.data,
1141 ^ids
1142 )
1143 )
1144 end
1145
1146 defp restrict_pinned(query, _), do: query
1147
1148 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1149 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1150
1151 from(
1152 activity in query,
1153 where:
1154 fragment(
1155 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1156 activity.data,
1157 activity.actor,
1158 ^muted_reblogs
1159 )
1160 )
1161 end
1162
1163 defp restrict_muted_reblogs(query, _), do: query
1164
1165 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1166 from(
1167 activity in query,
1168 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1169 )
1170 end
1171
1172 defp restrict_instance(query, _), do: query
1173
1174 defp restrict_filtered(query, %{user: %User{} = user}) do
1175 case Filter.compose_regex(user) do
1176 nil ->
1177 query
1178
1179 regex ->
1180 from([activity, object] in query,
1181 where:
1182 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1183 activity.actor == ^user.ap_id
1184 )
1185 end
1186 end
1187
1188 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1189 restrict_filtered(query, %{user: user})
1190 end
1191
1192 defp restrict_filtered(query, _), do: query
1193
1194 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1195
1196 defp exclude_poll_votes(query, _) do
1197 if has_named_binding?(query, :object) do
1198 from([activity, object: o] in query,
1199 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1200 )
1201 else
1202 query
1203 end
1204 end
1205
1206 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1207
1208 defp exclude_chat_messages(query, _) do
1209 if has_named_binding?(query, :object) do
1210 from([activity, object: o] in query,
1211 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1212 )
1213 else
1214 query
1215 end
1216 end
1217
1218 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1219
1220 defp exclude_invisible_actors(query, _opts) do
1221 invisible_ap_ids =
1222 User.Query.build(%{invisible: true, select: [:ap_id]})
1223 |> Repo.all()
1224 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1225
1226 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1227 end
1228
1229 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1230 from(activity in query, where: activity.id != ^id)
1231 end
1232
1233 defp exclude_id(query, _), do: query
1234
1235 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1236
1237 defp maybe_preload_objects(query, _) do
1238 query
1239 |> Activity.with_preloaded_object()
1240 end
1241
1242 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1243
1244 defp maybe_preload_bookmarks(query, opts) do
1245 query
1246 |> Activity.with_preloaded_bookmark(opts[:user])
1247 end
1248
1249 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1250 query
1251 |> Activity.with_preloaded_report_notes()
1252 end
1253
1254 defp maybe_preload_report_notes(query, _), do: query
1255
1256 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1257
1258 defp maybe_set_thread_muted_field(query, opts) do
1259 query
1260 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1261 end
1262
1263 defp maybe_order(query, %{order: :desc}) do
1264 query
1265 |> order_by(desc: :id)
1266 end
1267
1268 defp maybe_order(query, %{order: :asc}) do
1269 query
1270 |> order_by(asc: :id)
1271 end
1272
1273 defp maybe_order(query, _), do: query
1274
1275 defp normalize_fetch_activities_query_opts(opts) do
1276 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1277 case opts[key] do
1278 value when is_bitstring(value) ->
1279 Map.put(opts, key, Hashtag.normalize_name(value))
1280
1281 value when is_list(value) ->
1282 normalized_value =
1283 value
1284 |> Enum.map(&Hashtag.normalize_name/1)
1285 |> Enum.uniq()
1286
1287 Map.put(opts, key, normalized_value)
1288
1289 _ ->
1290 opts
1291 end
1292 end)
1293 end
1294
1295 defp fetch_activities_query_ap_ids_ops(opts) do
1296 source_user = opts[:muting_user]
1297 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1298
1299 ap_id_relationships =
1300 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1301 [:block | ap_id_relationships]
1302 else
1303 ap_id_relationships
1304 end
1305
1306 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1307
1308 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1309 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1310
1311 restrict_muted_reblogs_opts =
1312 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1313
1314 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1315 end
1316
1317 def fetch_activities_query(recipients, opts \\ %{}) do
1318 opts = normalize_fetch_activities_query_opts(opts)
1319
1320 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1321 fetch_activities_query_ap_ids_ops(opts)
1322
1323 config = %{
1324 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1325 }
1326
1327 query =
1328 Activity
1329 |> maybe_preload_objects(opts)
1330 |> maybe_preload_bookmarks(opts)
1331 |> maybe_preload_report_notes(opts)
1332 |> maybe_set_thread_muted_field(opts)
1333 |> maybe_order(opts)
1334 |> restrict_recipients(recipients, opts[:user])
1335 |> restrict_replies(opts)
1336 |> restrict_since(opts)
1337 |> restrict_local(opts)
1338 |> restrict_remote(opts)
1339 |> restrict_actor(opts)
1340 |> restrict_type(opts)
1341 |> restrict_state(opts)
1342 |> restrict_favorited_by(opts)
1343 |> restrict_blocked(restrict_blocked_opts)
1344 |> restrict_blockers_visibility(opts)
1345 |> restrict_muted(restrict_muted_opts)
1346 |> restrict_filtered(opts)
1347 |> restrict_media(opts)
1348 |> restrict_visibility(opts)
1349 |> restrict_thread_visibility(opts, config)
1350 |> restrict_reblogs(opts)
1351 |> restrict_pinned(opts)
1352 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1353 |> restrict_instance(opts)
1354 |> restrict_announce_object_actor(opts)
1355 |> restrict_filtered(opts)
1356 |> Activity.restrict_deactivated_users()
1357 |> exclude_poll_votes(opts)
1358 |> exclude_chat_messages(opts)
1359 |> exclude_invisible_actors(opts)
1360 |> exclude_visibility(opts)
1361
1362 if Config.feature_enabled?(:improved_hashtag_timeline) do
1363 query
1364 |> restrict_hashtag_any(opts)
1365 |> restrict_hashtag_all(opts)
1366 |> restrict_hashtag_reject_any(opts)
1367 else
1368 query
1369 |> restrict_embedded_tag_any(opts)
1370 |> restrict_embedded_tag_all(opts)
1371 |> restrict_embedded_tag_reject_any(opts)
1372 end
1373 end
1374
1375 @doc """
1376 Fetch favorites activities of user with order by sort adds to favorites
1377 """
1378 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1379 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1380 user.ap_id
1381 |> Activity.Queries.by_actor()
1382 |> Activity.Queries.by_type("Like")
1383 |> Activity.with_joined_object()
1384 |> Object.with_joined_activity()
1385 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1386 |> order_by([like, _, _], desc_nulls_last: like.id)
1387 |> Pagination.fetch_paginated(
1388 Map.merge(params, %{skip_order: true}),
1389 pagination
1390 )
1391 end
1392
1393 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1394 Enum.map(activities, fn
1395 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1396 if Enum.any?(bcc, &(&1 in list_memberships)) do
1397 update_in(activity.data["cc"], &[user_ap_id | &1])
1398 else
1399 activity
1400 end
1401
1402 activity ->
1403 activity
1404 end)
1405 end
1406
1407 defp maybe_update_cc(activities, _, _), do: activities
1408
1409 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1410 from(activity in query,
1411 where:
1412 fragment("? && ?", activity.recipients, ^recipients) or
1413 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1414 ^Constants.as_public() in activity.recipients)
1415 )
1416 end
1417
1418 def fetch_activities_bounded(
1419 recipients,
1420 recipients_with_public,
1421 opts \\ %{},
1422 pagination \\ :keyset
1423 ) do
1424 fetch_activities_query([], opts)
1425 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1426 |> Pagination.fetch_paginated(opts, pagination)
1427 |> Enum.reverse()
1428 end
1429
1430 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1431 def upload(file, opts \\ []) do
1432 with {:ok, data} <- Upload.store(file, opts) do
1433 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1434
1435 Repo.insert(%Object{data: obj_data})
1436 end
1437 end
1438
1439 @spec get_actor_url(any()) :: binary() | nil
1440 defp get_actor_url(url) when is_binary(url), do: url
1441 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1442
1443 defp get_actor_url(url) when is_list(url) do
1444 url
1445 |> List.first()
1446 |> get_actor_url()
1447 end
1448
1449 defp get_actor_url(_url), do: nil
1450
1451 defp normalize_image(%{"url" => url}) do
1452 %{
1453 "type" => "Image",
1454 "url" => [%{"href" => url}]
1455 }
1456 end
1457
1458 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1459 defp normalize_image(_), do: nil
1460
1461 defp object_to_user_data(data) do
1462 fields =
1463 data
1464 |> Map.get("attachment", [])
1465 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1466 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1467
1468 emojis =
1469 data
1470 |> Map.get("tag", [])
1471 |> Enum.filter(fn
1472 %{"type" => "Emoji"} -> true
1473 _ -> false
1474 end)
1475 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1476 {String.trim(name, ":"), url}
1477 end)
1478
1479 is_locked = data["manuallyApprovesFollowers"] || false
1480 capabilities = data["capabilities"] || %{}
1481 accepts_chat_messages = capabilities["acceptsChatMessages"]
1482 data = Transmogrifier.maybe_fix_user_object(data)
1483 is_discoverable = data["discoverable"] || false
1484 invisible = data["invisible"] || false
1485 actor_type = data["type"] || "Person"
1486
1487 featured_address = data["featured"]
1488 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1489
1490 public_key =
1491 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1492 data["publicKey"]["publicKeyPem"]
1493 else
1494 nil
1495 end
1496
1497 shared_inbox =
1498 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1499 data["endpoints"]["sharedInbox"]
1500 else
1501 nil
1502 end
1503
1504 user_data = %{
1505 ap_id: data["id"],
1506 uri: get_actor_url(data["url"]),
1507 ap_enabled: true,
1508 banner: normalize_image(data["image"]),
1509 fields: fields,
1510 emoji: emojis,
1511 is_locked: is_locked,
1512 is_discoverable: is_discoverable,
1513 invisible: invisible,
1514 avatar: normalize_image(data["icon"]),
1515 name: data["name"],
1516 follower_address: data["followers"],
1517 following_address: data["following"],
1518 featured_address: featured_address,
1519 bio: data["summary"] || "",
1520 actor_type: actor_type,
1521 also_known_as: Map.get(data, "alsoKnownAs", []),
1522 public_key: public_key,
1523 inbox: data["inbox"],
1524 shared_inbox: shared_inbox,
1525 accepts_chat_messages: accepts_chat_messages,
1526 pinned_objects: pinned_objects
1527 }
1528
1529 # nickname can be nil because of virtual actors
1530 if data["preferredUsername"] do
1531 Map.put(
1532 user_data,
1533 :nickname,
1534 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1535 )
1536 else
1537 Map.put(user_data, :nickname, nil)
1538 end
1539 end
1540
1541 def fetch_follow_information_for_user(user) do
1542 with {:ok, following_data} <-
1543 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1544 {:ok, hide_follows} <- collection_private(following_data),
1545 {:ok, followers_data} <-
1546 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1547 {:ok, hide_followers} <- collection_private(followers_data) do
1548 {:ok,
1549 %{
1550 hide_follows: hide_follows,
1551 follower_count: normalize_counter(followers_data["totalItems"]),
1552 following_count: normalize_counter(following_data["totalItems"]),
1553 hide_followers: hide_followers
1554 }}
1555 else
1556 {:error, _} = e -> e
1557 e -> {:error, e}
1558 end
1559 end
1560
1561 defp normalize_counter(counter) when is_integer(counter), do: counter
1562 defp normalize_counter(_), do: 0
1563
1564 def maybe_update_follow_information(user_data) do
1565 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1566 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1567 {_, true} <-
1568 {:collections_available,
1569 !!(user_data[:following_address] && user_data[:follower_address])},
1570 {:ok, info} <-
1571 fetch_follow_information_for_user(user_data) do
1572 info = Map.merge(user_data[:info] || %{}, info)
1573
1574 user_data
1575 |> Map.put(:info, info)
1576 else
1577 {:user_type_check, false} ->
1578 user_data
1579
1580 {:collections_available, false} ->
1581 user_data
1582
1583 {:enabled, false} ->
1584 user_data
1585
1586 e ->
1587 Logger.error(
1588 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1589 )
1590
1591 user_data
1592 end
1593 end
1594
1595 defp collection_private(%{"first" => %{"type" => type}})
1596 when type in ["CollectionPage", "OrderedCollectionPage"],
1597 do: {:ok, false}
1598
1599 defp collection_private(%{"first" => first}) do
1600 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1601 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1602 {:ok, false}
1603 else
1604 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1605 {:error, _} = e -> e
1606 e -> {:error, e}
1607 end
1608 end
1609
1610 defp collection_private(_data), do: {:ok, true}
1611
1612 def user_data_from_user_object(data) do
1613 with {:ok, data} <- MRF.filter(data) do
1614 {:ok, object_to_user_data(data)}
1615 else
1616 e -> {:error, e}
1617 end
1618 end
1619
1620 def fetch_and_prepare_user_from_ap_id(ap_id) do
1621 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1622 {:ok, data} <- user_data_from_user_object(data) do
1623 {:ok, maybe_update_follow_information(data)}
1624 else
1625 # If this has been deleted, only log a debug and not an error
1626 {:error, "Object has been deleted" = e} ->
1627 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1628 {:error, e}
1629
1630 {:error, {:reject, reason} = e} ->
1631 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1632 {:error, e}
1633
1634 {:error, e} ->
1635 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1636 {:error, e}
1637 end
1638 end
1639
1640 def maybe_handle_clashing_nickname(data) do
1641 with nickname when is_binary(nickname) <- data[:nickname],
1642 %User{} = old_user <- User.get_by_nickname(nickname),
1643 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1644 Logger.info(
1645 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1646 )
1647
1648 old_user
1649 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1650 |> User.update_and_set_cache()
1651 else
1652 {:ap_id_comparison, true} ->
1653 Logger.info(
1654 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1655 )
1656
1657 _ ->
1658 nil
1659 end
1660 end
1661
1662 def pin_data_from_featured_collection(%{
1663 "type" => type,
1664 "orderedItems" => objects
1665 })
1666 when type in ["OrderedCollection", "Collection"] do
1667 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1668 end
1669
1670 def fetch_and_prepare_featured_from_ap_id(nil) do
1671 {:ok, %{}}
1672 end
1673
1674 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1675 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1676 {:ok, pin_data_from_featured_collection(data)}
1677 else
1678 e ->
1679 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1680 {:ok, %{}}
1681 end
1682 end
1683
1684 def pinned_fetch_task(nil), do: nil
1685
1686 def pinned_fetch_task(%{pinned_objects: pins}) do
1687 if Enum.all?(pins, fn {ap_id, _} ->
1688 Object.get_cached_by_ap_id(ap_id) ||
1689 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1690 end) do
1691 :ok
1692 else
1693 :error
1694 end
1695 end
1696
1697 def make_user_from_ap_id(ap_id) do
1698 user = User.get_cached_by_ap_id(ap_id)
1699
1700 if user && !User.ap_enabled?(user) do
1701 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1702 else
1703 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1704 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1705
1706 if user do
1707 user
1708 |> User.remote_user_changeset(data)
1709 |> User.update_and_set_cache()
1710 else
1711 maybe_handle_clashing_nickname(data)
1712
1713 data
1714 |> User.remote_user_changeset()
1715 |> Repo.insert()
1716 |> User.set_cache()
1717 end
1718 end
1719 end
1720 end
1721
1722 def make_user_from_nickname(nickname) do
1723 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1724 make_user_from_ap_id(ap_id)
1725 else
1726 _e -> {:error, "No AP id in WebFinger"}
1727 end
1728 end
1729
1730 # filter out broken threads
1731 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1732 entire_thread_visible_for_user?(activity, user)
1733 end
1734
1735 # do post-processing on a specific activity
1736 def contain_activity(%Activity{} = activity, %User{} = user) do
1737 contain_broken_threads(activity, user)
1738 end
1739
1740 def fetch_direct_messages_query do
1741 Activity
1742 |> restrict_type(%{type: "Create"})
1743 |> restrict_visibility(%{visibility: "direct"})
1744 |> order_by([activity], asc: activity.id)
1745 end
1746 end