Initial meilisearch implementation, doesn't delete posts yet
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
29
30 import Ecto.Query
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
33
34 require Logger
35 require Pleroma.Constants
36
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
54 {recipients, to, cc}
55 end
56
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
63 _ -> false
64 end
65 end
66
67 defp check_actor_can_insert(_), do: true
68
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
72 end
73
74 defp check_remote_limit(_), do: true
75
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78 end
79
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82 end
83
84 def update_last_status_at_if_public(actor, object) do
85 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
86 end
87
88 defp increase_replies_count_if_reply(%{
89 "object" => %{"inReplyTo" => reply_ap_id} = object,
90 "type" => "Create"
91 }) do
92 if is_public?(object) do
93 Object.increase_replies_count(reply_ap_id)
94 end
95 end
96
97 defp increase_replies_count_if_reply(_create_data), do: :noop
98
99 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
100 @impl true
101 def persist(%{"type" => type} = object, meta) when type in @object_types do
102 with {:ok, object} <- Object.create(object) do
103 {:ok, object, meta}
104 end
105 end
106
107 @impl true
108 def persist(object, meta) do
109 with local <- Keyword.fetch!(meta, :local),
110 {recipients, _, _} <- get_recipients(object),
111 {:ok, activity} <-
112 Repo.insert(%Activity{
113 data: object,
114 local: local,
115 recipients: recipients,
116 actor: object["actor"]
117 }),
118 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
119 {:ok, _} <- maybe_create_activity_expiration(activity) do
120 {:ok, activity, meta}
121 end
122 end
123
124 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
125 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
129 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
134 {:ok, map, object} <- insert_full_object(map),
135 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
136 # Splice in the child object if we have one.
137 activity = Maps.put_if_present(activity, :object, object)
138
139 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
140 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
141 end)
142
143 search_module = Pleroma.Config.get([Pleroma.Search, :module])
144
145 ConcurrentLimiter.limit(Pleroma.Search, fn ->
146 Task.start(fn -> search_module.add_to_index(activity) end)
147 end)
148
149 {:ok, activity}
150 else
151 %Activity{} = activity ->
152 {:ok, activity}
153
154 {:actor_check, _} ->
155 {:error, false}
156
157 {:containment, _} = error ->
158 error
159
160 {:error, _} = error ->
161 error
162
163 {:fake, true, map, recipients} ->
164 activity = %Activity{
165 data: map,
166 local: local,
167 actor: map["actor"],
168 recipients: recipients,
169 id: "pleroma:fakeid"
170 }
171
172 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
173 {:ok, activity}
174
175 {:remote_limit_pass, _} ->
176 {:error, :remote_limit}
177
178 {:reject, _} = e ->
179 {:error, e}
180 end
181 end
182
183 defp insert_activity_with_expiration(data, local, recipients) do
184 struct = %Activity{
185 data: data,
186 local: local,
187 actor: data["actor"],
188 recipients: recipients
189 }
190
191 with {:ok, activity} <- Repo.insert(struct) do
192 maybe_create_activity_expiration(activity)
193 end
194 end
195
196 def notify_and_stream(activity) do
197 Notification.create_notifications(activity)
198
199 conversation = create_or_bump_conversation(activity, activity.actor)
200 participations = get_participations(conversation)
201 stream_out(activity)
202 stream_out_participations(participations)
203 end
204
205 defp maybe_create_activity_expiration(
206 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
207 ) do
208 with {:ok, _job} <-
209 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
210 activity_id: activity.id,
211 expires_at: expires_at
212 }) do
213 {:ok, activity}
214 end
215 end
216
217 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
218
219 defp create_or_bump_conversation(activity, actor) do
220 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
221 %User{} = user <- User.get_cached_by_ap_id(actor) do
222 Participation.mark_as_read(user, conversation)
223 {:ok, conversation}
224 end
225 end
226
227 defp get_participations({:ok, conversation}) do
228 conversation
229 |> Repo.preload(:participations, force: true)
230 |> Map.get(:participations)
231 end
232
233 defp get_participations(_), do: []
234
235 def stream_out_participations(participations) do
236 participations =
237 participations
238 |> Repo.preload(:user)
239
240 Streamer.stream("participation", participations)
241 end
242
243 @impl true
244 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
245 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
246 conversation = Repo.preload(conversation, :participations)
247
248 last_activity_id =
249 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
250 user: user,
251 blocking_user: user
252 })
253
254 if last_activity_id do
255 stream_out_participations(conversation.participations)
256 end
257 end
258 end
259
260 @impl true
261 def stream_out_participations(_, _), do: :noop
262
263 @impl true
264 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
265 when data_type in ["Create", "Announce", "Delete"] do
266 activity
267 |> Topics.get_activity_topics()
268 |> Streamer.stream(activity)
269 end
270
271 @impl true
272 def stream_out(_activity) do
273 :noop
274 end
275
276 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
277 def create(params, fake \\ false) do
278 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
279 result
280 end
281 end
282
283 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
284 additional = params[:additional] || %{}
285 # only accept false as false value
286 local = !(params[:local] == false)
287 published = params[:published]
288 quick_insert? = Config.get([:env]) == :benchmark
289
290 create_data =
291 make_create_data(
292 %{to: to, actor: actor, published: published, context: context, object: object},
293 additional
294 )
295
296 with {:ok, activity} <- insert(create_data, local, fake),
297 {:fake, false, activity} <- {:fake, fake, activity},
298 _ <- increase_replies_count_if_reply(create_data),
299 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
300 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
301 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
302 _ <- notify_and_stream(activity),
303 :ok <- maybe_schedule_poll_notifications(activity),
304 :ok <- maybe_federate(activity) do
305 {:ok, activity}
306 else
307 {:quick_insert, true, activity} ->
308 {:ok, activity}
309
310 {:fake, true, activity} ->
311 {:ok, activity}
312
313 {:error, message} ->
314 Repo.rollback(message)
315 end
316 end
317
318 defp maybe_schedule_poll_notifications(activity) do
319 PollWorker.schedule_poll_end(activity)
320 :ok
321 end
322
323 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
324 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
325 additional = params[:additional] || %{}
326 # only accept false as false value
327 local = !(params[:local] == false)
328 published = params[:published]
329
330 listen_data =
331 make_listen_data(
332 %{to: to, actor: actor, published: published, context: context, object: object},
333 additional
334 )
335
336 with {:ok, activity} <- insert(listen_data, local),
337 _ <- notify_and_stream(activity),
338 :ok <- maybe_federate(activity) do
339 {:ok, activity}
340 end
341 end
342
343 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
344 {:ok, Activity.t()} | nil | {:error, any()}
345 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
346 with {:ok, result} <-
347 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
348 result
349 end
350 end
351
352 defp do_unfollow(follower, followed, activity_id, local) do
353 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
354 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
355 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
356 {:ok, activity} <- insert(unfollow_data, local),
357 _ <- notify_and_stream(activity),
358 :ok <- maybe_federate(activity) do
359 {:ok, activity}
360 else
361 nil -> nil
362 {:error, error} -> Repo.rollback(error)
363 end
364 end
365
366 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
367 def flag(params) do
368 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
369 result
370 end
371 end
372
373 defp do_flag(
374 %{
375 actor: actor,
376 context: _context,
377 account: account,
378 statuses: statuses,
379 content: content
380 } = params
381 ) do
382 # only accept false as false value
383 local = !(params[:local] == false)
384 forward = !(params[:forward] == false)
385
386 additional = params[:additional] || %{}
387
388 additional =
389 if forward do
390 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
391 else
392 Map.merge(additional, %{"to" => [], "cc" => []})
393 end
394
395 with flag_data <- make_flag_data(params, additional),
396 {:ok, activity} <- insert(flag_data, local),
397 {:ok, stripped_activity} <- strip_report_status_data(activity),
398 _ <- notify_and_stream(activity),
399 :ok <-
400 maybe_federate(stripped_activity) do
401 User.all_superusers()
402 |> Enum.filter(fn user -> user.ap_id != actor end)
403 |> Enum.filter(fn user -> not is_nil(user.email) end)
404 |> Enum.each(fn superuser ->
405 superuser
406 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
407 |> Pleroma.Emails.Mailer.deliver_async()
408 end)
409
410 {:ok, activity}
411 else
412 {:error, error} -> Repo.rollback(error)
413 end
414 end
415
416 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
417 def move(%User{} = origin, %User{} = target, local \\ true) do
418 params = %{
419 "type" => "Move",
420 "actor" => origin.ap_id,
421 "object" => origin.ap_id,
422 "target" => target.ap_id
423 }
424
425 with true <- origin.ap_id in target.also_known_as,
426 {:ok, activity} <- insert(params, local),
427 _ <- notify_and_stream(activity) do
428 maybe_federate(activity)
429
430 BackgroundWorker.enqueue("move_following", %{
431 "origin_id" => origin.id,
432 "target_id" => target.id
433 })
434
435 {:ok, activity}
436 else
437 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
438 err -> err
439 end
440 end
441
442 def fetch_activities_for_context_query(context, opts) do
443 public = [Constants.as_public()]
444
445 recipients =
446 if opts[:user],
447 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
448 else: public
449
450 from(activity in Activity)
451 |> maybe_preload_objects(opts)
452 |> maybe_preload_bookmarks(opts)
453 |> maybe_set_thread_muted_field(opts)
454 |> restrict_blocked(opts)
455 |> restrict_blockers_visibility(opts)
456 |> restrict_recipients(recipients, opts[:user])
457 |> restrict_filtered(opts)
458 |> where(
459 [activity],
460 fragment(
461 "?->>'type' = ? and ?->>'context' = ?",
462 activity.data,
463 "Create",
464 activity.data,
465 ^context
466 )
467 )
468 |> exclude_poll_votes(opts)
469 |> exclude_id(opts)
470 |> order_by([activity], desc: activity.id)
471 end
472
473 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
474 def fetch_activities_for_context(context, opts \\ %{}) do
475 context
476 |> fetch_activities_for_context_query(opts)
477 |> Repo.all()
478 end
479
480 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
481 FlakeId.Ecto.CompatType.t() | nil
482 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
483 context
484 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
485 |> restrict_visibility(%{visibility: "direct"})
486 |> limit(1)
487 |> select([a], a.id)
488 |> Repo.one()
489 end
490
491 defp fetch_paginated_optimized(query, opts, pagination) do
492 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
493 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
494 opts = Map.put(opts, :skip_extra_order, true)
495
496 Pagination.fetch_paginated(query, opts, pagination)
497 end
498
499 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
500 list_memberships = Pleroma.List.memberships(opts[:user])
501
502 fetch_activities_query(recipients ++ list_memberships, opts)
503 |> fetch_paginated_optimized(opts, pagination)
504 |> Enum.reverse()
505 |> maybe_update_cc(list_memberships, opts[:user])
506 end
507
508 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
509 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
510 opts = Map.delete(opts, :user)
511
512 [Constants.as_public()]
513 |> fetch_activities_query(opts)
514 |> restrict_unlisted(opts)
515 |> fetch_paginated_optimized(opts, pagination)
516 end
517
518 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
519 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
520 opts
521 |> Map.put(:restrict_unlisted, true)
522 |> fetch_public_or_unlisted_activities(pagination)
523 end
524
525 @valid_visibilities ~w[direct unlisted public private]
526
527 defp restrict_visibility(query, %{visibility: visibility})
528 when is_list(visibility) do
529 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
530 from(
531 a in query,
532 where:
533 fragment(
534 "activity_visibility(?, ?, ?) = ANY (?)",
535 a.actor,
536 a.recipients,
537 a.data,
538 ^visibility
539 )
540 )
541 else
542 Logger.error("Could not restrict visibility to #{visibility}")
543 end
544 end
545
546 defp restrict_visibility(query, %{visibility: visibility})
547 when visibility in @valid_visibilities do
548 from(
549 a in query,
550 where:
551 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
552 )
553 end
554
555 defp restrict_visibility(_query, %{visibility: visibility})
556 when visibility not in @valid_visibilities do
557 Logger.error("Could not restrict visibility to #{visibility}")
558 end
559
560 defp restrict_visibility(query, _visibility), do: query
561
562 defp exclude_visibility(query, %{exclude_visibilities: visibility})
563 when is_list(visibility) do
564 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
565 from(
566 a in query,
567 where:
568 not fragment(
569 "activity_visibility(?, ?, ?) = ANY (?)",
570 a.actor,
571 a.recipients,
572 a.data,
573 ^visibility
574 )
575 )
576 else
577 Logger.error("Could not exclude visibility to #{visibility}")
578 query
579 end
580 end
581
582 defp exclude_visibility(query, %{exclude_visibilities: visibility})
583 when visibility in @valid_visibilities do
584 from(
585 a in query,
586 where:
587 not fragment(
588 "activity_visibility(?, ?, ?) = ?",
589 a.actor,
590 a.recipients,
591 a.data,
592 ^visibility
593 )
594 )
595 end
596
597 defp exclude_visibility(query, %{exclude_visibilities: visibility})
598 when visibility not in [nil | @valid_visibilities] do
599 Logger.error("Could not exclude visibility to #{visibility}")
600 query
601 end
602
603 defp exclude_visibility(query, _visibility), do: query
604
605 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
606 do: query
607
608 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
609 do: query
610
611 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
612 from(
613 a in query,
614 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
615 )
616 end
617
618 defp restrict_thread_visibility(query, _, _), do: query
619
620 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
621 params =
622 params
623 |> Map.put(:user, reading_user)
624 |> Map.put(:actor_id, user.ap_id)
625
626 %{
627 godmode: params[:godmode],
628 reading_user: reading_user
629 }
630 |> user_activities_recipients()
631 |> fetch_activities(params)
632 |> Enum.reverse()
633 end
634
635 def fetch_user_activities(user, reading_user, params \\ %{})
636
637 def fetch_user_activities(user, reading_user, %{total: true} = params) do
638 result = fetch_activities_for_user(user, reading_user, params)
639
640 Keyword.put(result, :items, Enum.reverse(result[:items]))
641 end
642
643 def fetch_user_activities(user, reading_user, params) do
644 user
645 |> fetch_activities_for_user(reading_user, params)
646 |> Enum.reverse()
647 end
648
649 defp fetch_activities_for_user(user, reading_user, params) do
650 params =
651 params
652 |> Map.put(:type, ["Create", "Announce"])
653 |> Map.put(:user, reading_user)
654 |> Map.put(:actor_id, user.ap_id)
655 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
656
657 params =
658 if User.blocks?(reading_user, user) do
659 params
660 else
661 params
662 |> Map.put(:blocking_user, reading_user)
663 |> Map.put(:muting_user, reading_user)
664 end
665
666 pagination_type = Map.get(params, :pagination_type) || :keyset
667
668 %{
669 godmode: params[:godmode],
670 reading_user: reading_user
671 }
672 |> user_activities_recipients()
673 |> fetch_activities(params, pagination_type)
674 end
675
676 def fetch_statuses(reading_user, %{total: true} = params) do
677 result = fetch_activities_for_reading_user(reading_user, params)
678 Keyword.put(result, :items, Enum.reverse(result[:items]))
679 end
680
681 def fetch_statuses(reading_user, params) do
682 reading_user
683 |> fetch_activities_for_reading_user(params)
684 |> Enum.reverse()
685 end
686
687 defp fetch_activities_for_reading_user(reading_user, params) do
688 params = Map.put(params, :type, ["Create", "Announce"])
689
690 %{
691 godmode: params[:godmode],
692 reading_user: reading_user
693 }
694 |> user_activities_recipients()
695 |> fetch_activities(params, :offset)
696 end
697
698 defp user_activities_recipients(%{godmode: true}), do: []
699
700 defp user_activities_recipients(%{reading_user: reading_user}) do
701 if reading_user do
702 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
703 else
704 [Constants.as_public()]
705 end
706 end
707
708 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
709 raise "Can't use the child object without preloading!"
710 end
711
712 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
713 from(
714 [activity, object] in query,
715 where:
716 fragment(
717 "?->>'type' != ? or ?->>'actor' != ?",
718 activity.data,
719 "Announce",
720 object.data,
721 ^actor
722 )
723 )
724 end
725
726 defp restrict_announce_object_actor(query, _), do: query
727
728 defp restrict_since(query, %{since_id: ""}), do: query
729
730 defp restrict_since(query, %{since_id: since_id}) do
731 from(activity in query, where: activity.id > ^since_id)
732 end
733
734 defp restrict_since(query, _), do: query
735
736 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
737 raise_on_missing_preload()
738 end
739
740 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
741 from(
742 [_activity, object] in query,
743 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
744 )
745 end
746
747 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
748 restrict_embedded_tag_any(query, %{tag: tag})
749 end
750
751 defp restrict_embedded_tag_all(query, _), do: query
752
753 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
754 raise_on_missing_preload()
755 end
756
757 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
758 from(
759 [_activity, object] in query,
760 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
761 )
762 end
763
764 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
765 restrict_embedded_tag_any(query, %{tag: [tag]})
766 end
767
768 defp restrict_embedded_tag_any(query, _), do: query
769
770 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
771 raise_on_missing_preload()
772 end
773
774 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
775 from(
776 [_activity, object] in query,
777 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
778 )
779 end
780
781 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
782 when is_binary(tag_reject) do
783 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
784 end
785
786 defp restrict_embedded_tag_reject_any(query, _), do: query
787
788 defp object_ids_query_for_tags(tags) do
789 from(hto in "hashtags_objects")
790 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
791 |> where([hto, ht], ht.name in ^tags)
792 |> select([hto], hto.object_id)
793 |> distinct([hto], true)
794 end
795
796 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
797 raise_on_missing_preload()
798 end
799
800 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
801 restrict_hashtag_any(query, %{tag: single_tag})
802 end
803
804 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
805 from(
806 [_activity, object] in query,
807 where:
808 fragment(
809 """
810 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
811 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
812 AND hashtags_objects.object_id = ?) @> ?
813 """,
814 ^tags,
815 object.id,
816 ^tags
817 )
818 )
819 end
820
821 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
822 restrict_hashtag_all(query, %{tag_all: [tag]})
823 end
824
825 defp restrict_hashtag_all(query, _), do: query
826
827 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
828 raise_on_missing_preload()
829 end
830
831 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
832 hashtag_ids =
833 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
834 |> Repo.all()
835
836 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
837 from(
838 [_activity, object] in query,
839 join: hto in "hashtags_objects",
840 on: hto.object_id == object.id,
841 where: hto.hashtag_id in ^hashtag_ids,
842 distinct: [desc: object.id],
843 order_by: [desc: object.id]
844 )
845 end
846
847 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
848 restrict_hashtag_any(query, %{tag: [tag]})
849 end
850
851 defp restrict_hashtag_any(query, _), do: query
852
853 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
854 raise_on_missing_preload()
855 end
856
857 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
858 from(
859 [_activity, object] in query,
860 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
861 )
862 end
863
864 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
865 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
866 end
867
868 defp restrict_hashtag_reject_any(query, _), do: query
869
870 defp raise_on_missing_preload do
871 raise "Can't use the child object without preloading!"
872 end
873
874 defp restrict_recipients(query, [], _user), do: query
875
876 defp restrict_recipients(query, recipients, nil) do
877 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
878 end
879
880 defp restrict_recipients(query, recipients, user) do
881 from(
882 activity in query,
883 where: fragment("? && ?", ^recipients, activity.recipients),
884 or_where: activity.actor == ^user.ap_id
885 )
886 end
887
888 defp restrict_local(query, %{local_only: true}) do
889 from(activity in query, where: activity.local == true)
890 end
891
892 defp restrict_local(query, _), do: query
893
894 defp restrict_remote(query, %{remote: true}) do
895 from(activity in query, where: activity.local == false)
896 end
897
898 defp restrict_remote(query, _), do: query
899
900 defp restrict_actor(query, %{actor_id: actor_id}) do
901 from(activity in query, where: activity.actor == ^actor_id)
902 end
903
904 defp restrict_actor(query, _), do: query
905
906 defp restrict_type(query, %{type: type}) when is_binary(type) do
907 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
908 end
909
910 defp restrict_type(query, %{type: type}) do
911 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
912 end
913
914 defp restrict_type(query, _), do: query
915
916 defp restrict_state(query, %{state: state}) do
917 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
918 end
919
920 defp restrict_state(query, _), do: query
921
922 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
923 from(
924 [_activity, object] in query,
925 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
926 )
927 end
928
929 defp restrict_favorited_by(query, _), do: query
930
931 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
932 raise "Can't use the child object without preloading!"
933 end
934
935 defp restrict_media(query, %{only_media: true}) do
936 from(
937 [activity, object] in query,
938 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
939 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
940 )
941 end
942
943 defp restrict_media(query, _), do: query
944
945 defp restrict_replies(query, %{exclude_replies: true}) do
946 from(
947 [_activity, object] in query,
948 where: fragment("?->>'inReplyTo' is null", object.data)
949 )
950 end
951
952 defp restrict_replies(query, %{
953 reply_filtering_user: %User{} = user,
954 reply_visibility: "self"
955 }) do
956 from(
957 [activity, object] in query,
958 where:
959 fragment(
960 "?->>'inReplyTo' is null OR ? = ANY(?)",
961 object.data,
962 ^user.ap_id,
963 activity.recipients
964 )
965 )
966 end
967
968 defp restrict_replies(query, %{
969 reply_filtering_user: %User{} = user,
970 reply_visibility: "following"
971 }) do
972 from(
973 [activity, object] in query,
974 where:
975 fragment(
976 """
977 ?->>'type' != 'Create' -- This isn't a Create
978 OR ?->>'inReplyTo' is null -- this isn't a reply
979 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
980 -- unless they are the author (because authors
981 -- are also part of the recipients). This leads
982 -- to a bug that self-replies by friends won't
983 -- show up.
984 OR ? = ? -- The actor is us
985 """,
986 activity.data,
987 object.data,
988 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
989 activity.recipients,
990 activity.actor,
991 activity.actor,
992 ^user.ap_id
993 )
994 )
995 end
996
997 defp restrict_replies(query, _), do: query
998
999 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1000 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1001 end
1002
1003 defp restrict_reblogs(query, _), do: query
1004
1005 defp restrict_muted(query, %{with_muted: true}), do: query
1006
1007 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1008 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1009
1010 query =
1011 from([activity] in query,
1012 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1013 where:
1014 fragment(
1015 "not (?->'to' \\?| ?) or ? = ?",
1016 activity.data,
1017 ^mutes,
1018 activity.actor,
1019 ^user.ap_id
1020 )
1021 )
1022
1023 unless opts[:skip_preload] do
1024 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1025 else
1026 query
1027 end
1028 end
1029
1030 defp restrict_muted(query, _), do: query
1031
1032 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1033 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1034 domain_blocks = user.domain_blocks || []
1035
1036 following_ap_ids = User.get_friends_ap_ids(user)
1037
1038 query =
1039 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1040
1041 from(
1042 [activity, object: o] in query,
1043 # You don't block the author
1044 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1045
1046 # You don't block any recipients, and didn't author the post
1047 where:
1048 fragment(
1049 "((not (? && ?)) or ? = ?)",
1050 activity.recipients,
1051 ^blocked_ap_ids,
1052 activity.actor,
1053 ^user.ap_id
1054 ),
1055
1056 # You don't block the domain of any recipients, and didn't author the post
1057 where:
1058 fragment(
1059 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1060 activity.recipients,
1061 ^domain_blocks,
1062 activity.actor,
1063 ^user.ap_id
1064 ),
1065
1066 # It's not a boost of a user you block
1067 where:
1068 fragment(
1069 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1070 activity.data,
1071 activity.data,
1072 ^blocked_ap_ids
1073 ),
1074
1075 # You don't block the author's domain, and also don't follow the author
1076 where:
1077 fragment(
1078 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1079 activity.actor,
1080 ^domain_blocks,
1081 activity.actor,
1082 ^following_ap_ids
1083 ),
1084
1085 # Same as above, but checks the Object
1086 where:
1087 fragment(
1088 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1089 o.data,
1090 ^domain_blocks,
1091 o.data,
1092 ^following_ap_ids
1093 )
1094 )
1095 end
1096
1097 defp restrict_blocked(query, _), do: query
1098
1099 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1100 if Config.get([:activitypub, :blockers_visible]) == true do
1101 query
1102 else
1103 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1104
1105 from(
1106 activity in query,
1107 # The author doesn't block you
1108 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1109
1110 # It's not a boost of a user that blocks you
1111 where:
1112 fragment(
1113 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1114 activity.data,
1115 activity.data,
1116 ^blocker_ap_ids
1117 )
1118 )
1119 end
1120 end
1121
1122 defp restrict_blockers_visibility(query, _), do: query
1123
1124 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1125 from(
1126 activity in query,
1127 where:
1128 fragment(
1129 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1130 activity.data,
1131 ^[Constants.as_public()]
1132 )
1133 )
1134 end
1135
1136 defp restrict_unlisted(query, _), do: query
1137
1138 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1139 from(
1140 [activity, object: o] in query,
1141 where:
1142 fragment(
1143 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1144 activity.data,
1145 activity.data,
1146 activity.data,
1147 ^ids
1148 )
1149 )
1150 end
1151
1152 defp restrict_pinned(query, _), do: query
1153
1154 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1155 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1156
1157 from(
1158 activity in query,
1159 where:
1160 fragment(
1161 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1162 activity.data,
1163 activity.actor,
1164 ^muted_reblogs
1165 )
1166 )
1167 end
1168
1169 defp restrict_muted_reblogs(query, _), do: query
1170
1171 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1172 from(
1173 activity in query,
1174 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1175 )
1176 end
1177
1178 defp restrict_instance(query, _), do: query
1179
1180 defp restrict_filtered(query, %{user: %User{} = user}) do
1181 case Filter.compose_regex(user) do
1182 nil ->
1183 query
1184
1185 regex ->
1186 from([activity, object] in query,
1187 where:
1188 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1189 activity.actor == ^user.ap_id
1190 )
1191 end
1192 end
1193
1194 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1195 restrict_filtered(query, %{user: user})
1196 end
1197
1198 defp restrict_filtered(query, _), do: query
1199
1200 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1201
1202 defp exclude_poll_votes(query, _) do
1203 if has_named_binding?(query, :object) do
1204 from([activity, object: o] in query,
1205 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1206 )
1207 else
1208 query
1209 end
1210 end
1211
1212 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1213
1214 defp exclude_chat_messages(query, _) do
1215 if has_named_binding?(query, :object) do
1216 from([activity, object: o] in query,
1217 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1218 )
1219 else
1220 query
1221 end
1222 end
1223
1224 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1225
1226 defp exclude_invisible_actors(query, _opts) do
1227 invisible_ap_ids =
1228 User.Query.build(%{invisible: true, select: [:ap_id]})
1229 |> Repo.all()
1230 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1231
1232 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1233 end
1234
1235 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1236 from(activity in query, where: activity.id != ^id)
1237 end
1238
1239 defp exclude_id(query, _), do: query
1240
1241 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1242
1243 defp maybe_preload_objects(query, _) do
1244 query
1245 |> Activity.with_preloaded_object()
1246 end
1247
1248 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1249
1250 defp maybe_preload_bookmarks(query, opts) do
1251 query
1252 |> Activity.with_preloaded_bookmark(opts[:user])
1253 end
1254
1255 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1256 query
1257 |> Activity.with_preloaded_report_notes()
1258 end
1259
1260 defp maybe_preload_report_notes(query, _), do: query
1261
1262 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1263
1264 defp maybe_set_thread_muted_field(query, opts) do
1265 query
1266 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1267 end
1268
1269 defp maybe_order(query, %{order: :desc}) do
1270 query
1271 |> order_by(desc: :id)
1272 end
1273
1274 defp maybe_order(query, %{order: :asc}) do
1275 query
1276 |> order_by(asc: :id)
1277 end
1278
1279 defp maybe_order(query, _), do: query
1280
1281 defp normalize_fetch_activities_query_opts(opts) do
1282 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1283 case opts[key] do
1284 value when is_bitstring(value) ->
1285 Map.put(opts, key, Hashtag.normalize_name(value))
1286
1287 value when is_list(value) ->
1288 normalized_value =
1289 value
1290 |> Enum.map(&Hashtag.normalize_name/1)
1291 |> Enum.uniq()
1292
1293 Map.put(opts, key, normalized_value)
1294
1295 _ ->
1296 opts
1297 end
1298 end)
1299 end
1300
1301 defp fetch_activities_query_ap_ids_ops(opts) do
1302 source_user = opts[:muting_user]
1303 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1304
1305 ap_id_relationships =
1306 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1307 [:block | ap_id_relationships]
1308 else
1309 ap_id_relationships
1310 end
1311
1312 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1313
1314 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1315 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1316
1317 restrict_muted_reblogs_opts =
1318 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1319
1320 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1321 end
1322
1323 def fetch_activities_query(recipients, opts \\ %{}) do
1324 opts = normalize_fetch_activities_query_opts(opts)
1325
1326 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1327 fetch_activities_query_ap_ids_ops(opts)
1328
1329 config = %{
1330 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1331 }
1332
1333 query =
1334 Activity
1335 |> maybe_preload_objects(opts)
1336 |> maybe_preload_bookmarks(opts)
1337 |> maybe_preload_report_notes(opts)
1338 |> maybe_set_thread_muted_field(opts)
1339 |> maybe_order(opts)
1340 |> restrict_recipients(recipients, opts[:user])
1341 |> restrict_replies(opts)
1342 |> restrict_since(opts)
1343 |> restrict_local(opts)
1344 |> restrict_remote(opts)
1345 |> restrict_actor(opts)
1346 |> restrict_type(opts)
1347 |> restrict_state(opts)
1348 |> restrict_favorited_by(opts)
1349 |> restrict_blocked(restrict_blocked_opts)
1350 |> restrict_blockers_visibility(opts)
1351 |> restrict_muted(restrict_muted_opts)
1352 |> restrict_filtered(opts)
1353 |> restrict_media(opts)
1354 |> restrict_visibility(opts)
1355 |> restrict_thread_visibility(opts, config)
1356 |> restrict_reblogs(opts)
1357 |> restrict_pinned(opts)
1358 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1359 |> restrict_instance(opts)
1360 |> restrict_announce_object_actor(opts)
1361 |> restrict_filtered(opts)
1362 |> Activity.restrict_deactivated_users()
1363 |> exclude_poll_votes(opts)
1364 |> exclude_chat_messages(opts)
1365 |> exclude_invisible_actors(opts)
1366 |> exclude_visibility(opts)
1367
1368 if Config.feature_enabled?(:improved_hashtag_timeline) do
1369 query
1370 |> restrict_hashtag_any(opts)
1371 |> restrict_hashtag_all(opts)
1372 |> restrict_hashtag_reject_any(opts)
1373 else
1374 query
1375 |> restrict_embedded_tag_any(opts)
1376 |> restrict_embedded_tag_all(opts)
1377 |> restrict_embedded_tag_reject_any(opts)
1378 end
1379 end
1380
1381 @doc """
1382 Fetch favorites activities of user with order by sort adds to favorites
1383 """
1384 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1385 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1386 user.ap_id
1387 |> Activity.Queries.by_actor()
1388 |> Activity.Queries.by_type("Like")
1389 |> Activity.with_joined_object()
1390 |> Object.with_joined_activity()
1391 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1392 |> order_by([like, _, _], desc_nulls_last: like.id)
1393 |> Pagination.fetch_paginated(
1394 Map.merge(params, %{skip_order: true}),
1395 pagination
1396 )
1397 end
1398
1399 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1400 Enum.map(activities, fn
1401 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1402 if Enum.any?(bcc, &(&1 in list_memberships)) do
1403 update_in(activity.data["cc"], &[user_ap_id | &1])
1404 else
1405 activity
1406 end
1407
1408 activity ->
1409 activity
1410 end)
1411 end
1412
1413 defp maybe_update_cc(activities, _, _), do: activities
1414
1415 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1416 from(activity in query,
1417 where:
1418 fragment("? && ?", activity.recipients, ^recipients) or
1419 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1420 ^Constants.as_public() in activity.recipients)
1421 )
1422 end
1423
1424 def fetch_activities_bounded(
1425 recipients,
1426 recipients_with_public,
1427 opts \\ %{},
1428 pagination \\ :keyset
1429 ) do
1430 fetch_activities_query([], opts)
1431 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1432 |> Pagination.fetch_paginated(opts, pagination)
1433 |> Enum.reverse()
1434 end
1435
1436 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1437 def upload(file, opts \\ []) do
1438 with {:ok, data} <- Upload.store(file, opts) do
1439 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1440
1441 Repo.insert(%Object{data: obj_data})
1442 end
1443 end
1444
1445 @spec get_actor_url(any()) :: binary() | nil
1446 defp get_actor_url(url) when is_binary(url), do: url
1447 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1448
1449 defp get_actor_url(url) when is_list(url) do
1450 url
1451 |> List.first()
1452 |> get_actor_url()
1453 end
1454
1455 defp get_actor_url(_url), do: nil
1456
1457 defp normalize_image(%{"url" => url}) do
1458 %{
1459 "type" => "Image",
1460 "url" => [%{"href" => url}]
1461 }
1462 end
1463
1464 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1465 defp normalize_image(_), do: nil
1466
1467 defp object_to_user_data(data) do
1468 fields =
1469 data
1470 |> Map.get("attachment", [])
1471 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1472 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1473
1474 emojis =
1475 data
1476 |> Map.get("tag", [])
1477 |> Enum.filter(fn
1478 %{"type" => "Emoji"} -> true
1479 _ -> false
1480 end)
1481 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1482 {String.trim(name, ":"), url}
1483 end)
1484
1485 is_locked = data["manuallyApprovesFollowers"] || false
1486 capabilities = data["capabilities"] || %{}
1487 accepts_chat_messages = capabilities["acceptsChatMessages"]
1488 data = Transmogrifier.maybe_fix_user_object(data)
1489 is_discoverable = data["discoverable"] || false
1490 invisible = data["invisible"] || false
1491 actor_type = data["type"] || "Person"
1492
1493 featured_address = data["featured"]
1494 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1495
1496 public_key =
1497 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1498 data["publicKey"]["publicKeyPem"]
1499 else
1500 nil
1501 end
1502
1503 shared_inbox =
1504 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1505 data["endpoints"]["sharedInbox"]
1506 else
1507 nil
1508 end
1509
1510 user_data = %{
1511 ap_id: data["id"],
1512 uri: get_actor_url(data["url"]),
1513 ap_enabled: true,
1514 banner: normalize_image(data["image"]),
1515 fields: fields,
1516 emoji: emojis,
1517 is_locked: is_locked,
1518 is_discoverable: is_discoverable,
1519 invisible: invisible,
1520 avatar: normalize_image(data["icon"]),
1521 name: data["name"],
1522 follower_address: data["followers"],
1523 following_address: data["following"],
1524 featured_address: featured_address,
1525 bio: data["summary"] || "",
1526 actor_type: actor_type,
1527 also_known_as: Map.get(data, "alsoKnownAs", []),
1528 public_key: public_key,
1529 inbox: data["inbox"],
1530 shared_inbox: shared_inbox,
1531 accepts_chat_messages: accepts_chat_messages,
1532 pinned_objects: pinned_objects
1533 }
1534
1535 # nickname can be nil because of virtual actors
1536 if data["preferredUsername"] do
1537 Map.put(
1538 user_data,
1539 :nickname,
1540 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1541 )
1542 else
1543 Map.put(user_data, :nickname, nil)
1544 end
1545 end
1546
1547 def fetch_follow_information_for_user(user) do
1548 with {:ok, following_data} <-
1549 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1550 {:ok, hide_follows} <- collection_private(following_data),
1551 {:ok, followers_data} <-
1552 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1553 {:ok, hide_followers} <- collection_private(followers_data) do
1554 {:ok,
1555 %{
1556 hide_follows: hide_follows,
1557 follower_count: normalize_counter(followers_data["totalItems"]),
1558 following_count: normalize_counter(following_data["totalItems"]),
1559 hide_followers: hide_followers
1560 }}
1561 else
1562 {:error, _} = e -> e
1563 e -> {:error, e}
1564 end
1565 end
1566
1567 defp normalize_counter(counter) when is_integer(counter), do: counter
1568 defp normalize_counter(_), do: 0
1569
1570 def maybe_update_follow_information(user_data) do
1571 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1572 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1573 {_, true} <-
1574 {:collections_available,
1575 !!(user_data[:following_address] && user_data[:follower_address])},
1576 {:ok, info} <-
1577 fetch_follow_information_for_user(user_data) do
1578 info = Map.merge(user_data[:info] || %{}, info)
1579
1580 user_data
1581 |> Map.put(:info, info)
1582 else
1583 {:user_type_check, false} ->
1584 user_data
1585
1586 {:collections_available, false} ->
1587 user_data
1588
1589 {:enabled, false} ->
1590 user_data
1591
1592 e ->
1593 Logger.error(
1594 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1595 )
1596
1597 user_data
1598 end
1599 end
1600
1601 defp collection_private(%{"first" => %{"type" => type}})
1602 when type in ["CollectionPage", "OrderedCollectionPage"],
1603 do: {:ok, false}
1604
1605 defp collection_private(%{"first" => first}) do
1606 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1607 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1608 {:ok, false}
1609 else
1610 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1611 {:error, _} = e -> e
1612 e -> {:error, e}
1613 end
1614 end
1615
1616 defp collection_private(_data), do: {:ok, true}
1617
1618 def user_data_from_user_object(data) do
1619 with {:ok, data} <- MRF.filter(data) do
1620 {:ok, object_to_user_data(data)}
1621 else
1622 e -> {:error, e}
1623 end
1624 end
1625
1626 def fetch_and_prepare_user_from_ap_id(ap_id) do
1627 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1628 {:ok, data} <- user_data_from_user_object(data) do
1629 {:ok, maybe_update_follow_information(data)}
1630 else
1631 # If this has been deleted, only log a debug and not an error
1632 {:error, "Object has been deleted" = e} ->
1633 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1634 {:error, e}
1635
1636 {:error, {:reject, reason} = e} ->
1637 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1638 {:error, e}
1639
1640 {:error, e} ->
1641 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1642 {:error, e}
1643 end
1644 end
1645
1646 def maybe_handle_clashing_nickname(data) do
1647 with nickname when is_binary(nickname) <- data[:nickname],
1648 %User{} = old_user <- User.get_by_nickname(nickname),
1649 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1650 Logger.info(
1651 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1652 )
1653
1654 old_user
1655 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1656 |> User.update_and_set_cache()
1657 else
1658 {:ap_id_comparison, true} ->
1659 Logger.info(
1660 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1661 )
1662
1663 _ ->
1664 nil
1665 end
1666 end
1667
1668 def pin_data_from_featured_collection(%{
1669 "type" => type,
1670 "orderedItems" => objects
1671 })
1672 when type in ["OrderedCollection", "Collection"] do
1673 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1674 end
1675
1676 def fetch_and_prepare_featured_from_ap_id(nil) do
1677 {:ok, %{}}
1678 end
1679
1680 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1681 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1682 {:ok, pin_data_from_featured_collection(data)}
1683 else
1684 e ->
1685 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1686 {:ok, %{}}
1687 end
1688 end
1689
1690 def pinned_fetch_task(nil), do: nil
1691
1692 def pinned_fetch_task(%{pinned_objects: pins}) do
1693 if Enum.all?(pins, fn {ap_id, _} ->
1694 Object.get_cached_by_ap_id(ap_id) ||
1695 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1696 end) do
1697 :ok
1698 else
1699 :error
1700 end
1701 end
1702
1703 def make_user_from_ap_id(ap_id) do
1704 user = User.get_cached_by_ap_id(ap_id)
1705
1706 if user && !User.ap_enabled?(user) do
1707 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1708 else
1709 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1710 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1711
1712 if user do
1713 user
1714 |> User.remote_user_changeset(data)
1715 |> User.update_and_set_cache()
1716 else
1717 maybe_handle_clashing_nickname(data)
1718
1719 data
1720 |> User.remote_user_changeset()
1721 |> Repo.insert()
1722 |> User.set_cache()
1723 end
1724 end
1725 end
1726 end
1727
1728 def make_user_from_nickname(nickname) do
1729 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1730 make_user_from_ap_id(ap_id)
1731 else
1732 _e -> {:error, "No AP id in WebFinger"}
1733 end
1734 end
1735
1736 # filter out broken threads
1737 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1738 entire_thread_visible_for_user?(activity, user)
1739 end
1740
1741 # do post-processing on a specific activity
1742 def contain_activity(%Activity{} = activity, %User{} = user) do
1743 contain_broken_threads(activity, user)
1744 end
1745
1746 def fetch_direct_messages_query do
1747 Activity
1748 |> restrict_type(%{type: "Create"})
1749 |> restrict_visibility(%{visibility: "direct"})
1750 |> order_by([activity], asc: activity.id)
1751 end
1752 end