local-only-fixed (#138)
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
199 stream_out(activity)
200 stream_out_participations(participations)
201 end
202
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
205 ) do
206 with {:ok, _job} <-
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
210 }) do
211 {:ok, activity}
212 end
213 end
214
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
216
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
221 {:ok, conversation}
222 end
223 end
224
225 defp get_participations({:ok, conversation}) do
226 conversation
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
229 end
230
231 defp get_participations(_), do: []
232
233 def stream_out_participations(participations) do
234 participations =
235 participations
236 |> Repo.preload(:user)
237
238 Streamer.stream("participation", participations)
239 end
240
241 @impl true
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
245
246 last_activity_id =
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
248 user: user,
249 blocking_user: user
250 })
251
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
254 end
255 end
256 end
257
258 @impl true
259 def stream_out_participations(_, _), do: :noop
260
261 @impl true
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
264 activity
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
267 end
268
269 @impl true
270 def stream_out(_activity) do
271 :noop
272 end
273
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
277 result
278 end
279 end
280
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
287
288 create_data =
289 make_create_data(
290 %{to: to, actor: actor, published: published, context: context, object: object},
291 additional
292 )
293
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 else
305 {:quick_insert, true, activity} ->
306 {:ok, activity}
307
308 {:fake, true, activity} ->
309 {:ok, activity}
310
311 {:error, message} ->
312 Repo.rollback(message)
313 end
314 end
315
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
318 :ok
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
383 superuser
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
386 end)
387
388 {:ok, activity}
389 else
390 {:error, error} -> Repo.rollback(error)
391 end
392 end
393
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
396 params = %{
397 "type" => "Move",
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id,
401 "to" => [origin.follower_address]
402 }
403
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
408
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
412 })
413
414 {:ok, activity}
415 else
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
417 err -> err
418 end
419 end
420
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
423
424 recipients =
425 if opts[:user],
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 else: public
428
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_blockers_visibility(opts)
435 |> restrict_recipients(recipients, opts[:user])
436 |> restrict_filtered(opts)
437 |> where(
438 [activity],
439 fragment(
440 "?->>'type' = ? and ?->>'context' = ?",
441 activity.data,
442 "Create",
443 activity.data,
444 ^context
445 )
446 )
447 |> exclude_poll_votes(opts)
448 |> exclude_id(opts)
449 |> order_by([activity], desc: activity.id)
450 end
451
452 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
453 def fetch_activities_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(opts)
456 |> Repo.all()
457 end
458
459 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
460 FlakeId.Ecto.CompatType.t() | nil
461 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
462 context
463 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
464 |> restrict_visibility(%{visibility: "direct"})
465 |> limit(1)
466 |> select([a], a.id)
467 |> Repo.one()
468 end
469
470 defp fetch_paginated_optimized(query, opts, pagination) do
471 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
472 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
473 opts = Map.put(opts, :skip_extra_order, true)
474
475 Pagination.fetch_paginated(query, opts, pagination)
476 end
477
478 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
479 list_memberships = Pleroma.List.memberships(opts[:user])
480
481 fetch_activities_query(recipients ++ list_memberships, opts)
482 |> fetch_paginated_optimized(opts, pagination)
483 |> Enum.reverse()
484 |> maybe_update_cc(list_memberships, opts[:user])
485 end
486
487 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
488 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
489 includes_local_public = Map.get(opts, :includes_local_public, false)
490
491 opts = Map.delete(opts, :user)
492
493 intended_recipients =
494 if includes_local_public do
495 [Constants.as_public(), as_local_public()]
496 else
497 [Constants.as_public()]
498 end
499
500 intended_recipients
501 |> fetch_activities_query(opts)
502 |> restrict_unlisted(opts)
503 |> fetch_paginated_optimized(opts, pagination)
504 end
505
506 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
507 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
508 opts
509 |> Map.put(:restrict_unlisted, true)
510 |> fetch_public_or_unlisted_activities(pagination)
511 end
512
513 @valid_visibilities ~w[direct unlisted public private]
514
515 defp restrict_visibility(query, %{visibility: visibility})
516 when is_list(visibility) do
517 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
518 from(
519 a in query,
520 where:
521 fragment(
522 "activity_visibility(?, ?, ?) = ANY (?)",
523 a.actor,
524 a.recipients,
525 a.data,
526 ^visibility
527 )
528 )
529 else
530 Logger.error("Could not restrict visibility to #{visibility}")
531 end
532 end
533
534 defp restrict_visibility(query, %{visibility: visibility})
535 when visibility in @valid_visibilities do
536 from(
537 a in query,
538 where:
539 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
540 )
541 end
542
543 defp restrict_visibility(_query, %{visibility: visibility})
544 when visibility not in @valid_visibilities do
545 Logger.error("Could not restrict visibility to #{visibility}")
546 end
547
548 defp restrict_visibility(query, _visibility), do: query
549
550 defp exclude_visibility(query, %{exclude_visibilities: visibility})
551 when is_list(visibility) do
552 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
553 from(
554 a in query,
555 where:
556 not fragment(
557 "activity_visibility(?, ?, ?) = ANY (?)",
558 a.actor,
559 a.recipients,
560 a.data,
561 ^visibility
562 )
563 )
564 else
565 Logger.error("Could not exclude visibility to #{visibility}")
566 query
567 end
568 end
569
570 defp exclude_visibility(query, %{exclude_visibilities: visibility})
571 when visibility in @valid_visibilities do
572 from(
573 a in query,
574 where:
575 not fragment(
576 "activity_visibility(?, ?, ?) = ?",
577 a.actor,
578 a.recipients,
579 a.data,
580 ^visibility
581 )
582 )
583 end
584
585 defp exclude_visibility(query, %{exclude_visibilities: visibility})
586 when visibility not in [nil | @valid_visibilities] do
587 Logger.error("Could not exclude visibility to #{visibility}")
588 query
589 end
590
591 defp exclude_visibility(query, _visibility), do: query
592
593 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
594 do: query
595
596 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
597 do: query
598
599 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
600 local_public = as_local_public()
601
602 from(
603 a in query,
604 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
605 )
606 end
607
608 defp restrict_thread_visibility(query, _, _), do: query
609
610 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
611 params =
612 params
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615
616 %{
617 godmode: params[:godmode],
618 reading_user: reading_user
619 }
620 |> user_activities_recipients()
621 |> fetch_activities(params)
622 |> Enum.reverse()
623 end
624
625 def fetch_user_activities(user, reading_user, params \\ %{})
626
627 def fetch_user_activities(user, reading_user, %{total: true} = params) do
628 result = fetch_activities_for_user(user, reading_user, params)
629
630 Keyword.put(result, :items, Enum.reverse(result[:items]))
631 end
632
633 def fetch_user_activities(user, reading_user, params) do
634 user
635 |> fetch_activities_for_user(reading_user, params)
636 |> Enum.reverse()
637 end
638
639 defp fetch_activities_for_user(user, reading_user, params) do
640 params =
641 params
642 |> Map.put(:type, ["Create", "Announce"])
643 |> Map.put(:user, reading_user)
644 |> Map.put(:actor_id, user.ap_id)
645 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
646
647 params =
648 if User.blocks?(reading_user, user) do
649 params
650 else
651 params
652 |> Map.put(:blocking_user, reading_user)
653 |> Map.put(:muting_user, reading_user)
654 end
655
656 pagination_type = Map.get(params, :pagination_type) || :keyset
657
658 %{
659 godmode: params[:godmode],
660 reading_user: reading_user
661 }
662 |> user_activities_recipients()
663 |> fetch_activities(params, pagination_type)
664 end
665
666 def fetch_statuses(reading_user, %{total: true} = params) do
667 result = fetch_activities_for_reading_user(reading_user, params)
668 Keyword.put(result, :items, Enum.reverse(result[:items]))
669 end
670
671 def fetch_statuses(reading_user, params) do
672 reading_user
673 |> fetch_activities_for_reading_user(params)
674 |> Enum.reverse()
675 end
676
677 defp fetch_activities_for_reading_user(reading_user, params) do
678 params = Map.put(params, :type, ["Create", "Announce"])
679
680 %{
681 godmode: params[:godmode],
682 reading_user: reading_user
683 }
684 |> user_activities_recipients()
685 |> fetch_activities(params, :offset)
686 end
687
688 defp user_activities_recipients(%{godmode: true}), do: []
689
690 defp user_activities_recipients(%{reading_user: reading_user}) do
691 if not is_nil(reading_user) and reading_user.local do
692 [
693 Constants.as_public(),
694 as_local_public(),
695 reading_user.ap_id | User.following(reading_user)
696 ]
697 else
698 [Constants.as_public()]
699 end
700 end
701
702 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
703 raise "Can't use the child object without preloading!"
704 end
705
706 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
707 from(
708 [activity, object] in query,
709 where:
710 fragment(
711 "?->>'type' != ? or ?->>'actor' != ?",
712 activity.data,
713 "Announce",
714 object.data,
715 ^actor
716 )
717 )
718 end
719
720 defp restrict_announce_object_actor(query, _), do: query
721
722 defp restrict_since(query, %{since_id: ""}), do: query
723
724 defp restrict_since(query, %{since_id: since_id}) do
725 from(activity in query, where: activity.id > ^since_id)
726 end
727
728 defp restrict_since(query, _), do: query
729
730 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
731 raise_on_missing_preload()
732 end
733
734 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
735 from(
736 [_activity, object] in query,
737 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
738 )
739 end
740
741 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
742 restrict_embedded_tag_any(query, %{tag: tag})
743 end
744
745 defp restrict_embedded_tag_all(query, _), do: query
746
747 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
748 raise_on_missing_preload()
749 end
750
751 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
752 from(
753 [_activity, object] in query,
754 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
755 )
756 end
757
758 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
759 restrict_embedded_tag_any(query, %{tag: [tag]})
760 end
761
762 defp restrict_embedded_tag_any(query, _), do: query
763
764 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
765 raise_on_missing_preload()
766 end
767
768 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
769 from(
770 [_activity, object] in query,
771 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
772 )
773 end
774
775 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
776 when is_binary(tag_reject) do
777 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
778 end
779
780 defp restrict_embedded_tag_reject_any(query, _), do: query
781
782 defp object_ids_query_for_tags(tags) do
783 from(hto in "hashtags_objects")
784 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
785 |> where([hto, ht], ht.name in ^tags)
786 |> select([hto], hto.object_id)
787 |> distinct([hto], true)
788 end
789
790 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
791 raise_on_missing_preload()
792 end
793
794 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
795 restrict_hashtag_any(query, %{tag: single_tag})
796 end
797
798 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
799 from(
800 [_activity, object] in query,
801 where:
802 fragment(
803 """
804 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
805 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
806 AND hashtags_objects.object_id = ?) @> ?
807 """,
808 ^tags,
809 object.id,
810 ^tags
811 )
812 )
813 end
814
815 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
816 restrict_hashtag_all(query, %{tag_all: [tag]})
817 end
818
819 defp restrict_hashtag_all(query, _), do: query
820
821 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
822 raise_on_missing_preload()
823 end
824
825 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
826 hashtag_ids =
827 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
828 |> Repo.all()
829
830 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
831 from(
832 [_activity, object] in query,
833 join: hto in "hashtags_objects",
834 on: hto.object_id == object.id,
835 where: hto.hashtag_id in ^hashtag_ids,
836 distinct: [desc: object.id],
837 order_by: [desc: object.id]
838 )
839 end
840
841 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
842 restrict_hashtag_any(query, %{tag: [tag]})
843 end
844
845 defp restrict_hashtag_any(query, _), do: query
846
847 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
848 raise_on_missing_preload()
849 end
850
851 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
852 from(
853 [_activity, object] in query,
854 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
855 )
856 end
857
858 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
859 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
860 end
861
862 defp restrict_hashtag_reject_any(query, _), do: query
863
864 defp raise_on_missing_preload do
865 raise "Can't use the child object without preloading!"
866 end
867
868 defp restrict_recipients(query, [], _user), do: query
869
870 defp restrict_recipients(query, recipients, nil) do
871 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
872 end
873
874 defp restrict_recipients(query, recipients, user) do
875 from(
876 activity in query,
877 where: fragment("? && ?", ^recipients, activity.recipients),
878 or_where: activity.actor == ^user.ap_id
879 )
880 end
881
882 defp restrict_local(query, %{local_only: true}) do
883 from(activity in query, where: activity.local == true)
884 end
885
886 defp restrict_local(query, _), do: query
887
888 defp restrict_remote(query, %{remote: true}) do
889 from(activity in query, where: activity.local == false)
890 end
891
892 defp restrict_remote(query, _), do: query
893
894 defp restrict_actor(query, %{actor_id: actor_id}) do
895 from(activity in query, where: activity.actor == ^actor_id)
896 end
897
898 defp restrict_actor(query, _), do: query
899
900 defp restrict_type(query, %{type: type}) when is_binary(type) do
901 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
902 end
903
904 defp restrict_type(query, %{type: type}) do
905 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
906 end
907
908 defp restrict_type(query, _), do: query
909
910 defp restrict_state(query, %{state: state}) do
911 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
912 end
913
914 defp restrict_state(query, _), do: query
915
916 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
917 from(
918 [_activity, object] in query,
919 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
920 )
921 end
922
923 defp restrict_favorited_by(query, _), do: query
924
925 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
926 raise "Can't use the child object without preloading!"
927 end
928
929 defp restrict_media(query, %{only_media: true}) do
930 from(
931 [activity, object] in query,
932 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
933 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
934 )
935 end
936
937 defp restrict_media(query, _), do: query
938
939 defp restrict_replies(query, %{exclude_replies: true}) do
940 from(
941 [_activity, object] in query,
942 where: fragment("?->>'inReplyTo' is null", object.data)
943 )
944 end
945
946 defp restrict_replies(query, %{
947 reply_filtering_user: %User{} = user,
948 reply_visibility: "self"
949 }) do
950 from(
951 [activity, object] in query,
952 where:
953 fragment(
954 "?->>'inReplyTo' is null OR ? = ANY(?)",
955 object.data,
956 ^user.ap_id,
957 activity.recipients
958 )
959 )
960 end
961
962 defp restrict_replies(query, %{
963 reply_filtering_user: %User{} = user,
964 reply_visibility: "following"
965 }) do
966 from(
967 [activity, object] in query,
968 where:
969 fragment(
970 """
971 ?->>'type' != 'Create' -- This isn't a Create
972 OR ?->>'inReplyTo' is null -- this isn't a reply
973 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
974 -- unless they are the author (because authors
975 -- are also part of the recipients). This leads
976 -- to a bug that self-replies by friends won't
977 -- show up.
978 OR ? = ? -- The actor is us
979 """,
980 activity.data,
981 object.data,
982 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
983 activity.recipients,
984 activity.actor,
985 activity.actor,
986 ^user.ap_id
987 )
988 )
989 end
990
991 defp restrict_replies(query, _), do: query
992
993 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
994 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
995 end
996
997 defp restrict_reblogs(query, _), do: query
998
999 defp restrict_muted(query, %{with_muted: true}), do: query
1000
1001 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1002 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1003
1004 query =
1005 from([activity] in query,
1006 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1007 where:
1008 fragment(
1009 "not (?->'to' \\?| ?) or ? = ?",
1010 activity.data,
1011 ^mutes,
1012 activity.actor,
1013 ^user.ap_id
1014 )
1015 )
1016
1017 unless opts[:skip_preload] do
1018 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1019 else
1020 query
1021 end
1022 end
1023
1024 defp restrict_muted(query, _), do: query
1025
1026 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1027 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1028 domain_blocks = user.domain_blocks || []
1029
1030 following_ap_ids = User.get_friends_ap_ids(user)
1031
1032 query =
1033 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1034
1035 from(
1036 [activity, object: o] in query,
1037 # You don't block the author
1038 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1039
1040 # You don't block any recipients, and didn't author the post
1041 where:
1042 fragment(
1043 "((not (? && ?)) or ? = ?)",
1044 activity.recipients,
1045 ^blocked_ap_ids,
1046 activity.actor,
1047 ^user.ap_id
1048 ),
1049
1050 # You don't block the domain of any recipients, and didn't author the post
1051 where:
1052 fragment(
1053 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1054 activity.recipients,
1055 ^domain_blocks,
1056 activity.actor,
1057 ^user.ap_id
1058 ),
1059
1060 # It's not a boost of a user you block
1061 where:
1062 fragment(
1063 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1064 activity.data,
1065 activity.data,
1066 ^blocked_ap_ids
1067 ),
1068
1069 # You don't block the author's domain, and also don't follow the author
1070 where:
1071 fragment(
1072 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1073 activity.actor,
1074 ^domain_blocks,
1075 activity.actor,
1076 ^following_ap_ids
1077 ),
1078
1079 # Same as above, but checks the Object
1080 where:
1081 fragment(
1082 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1083 o.data,
1084 ^domain_blocks,
1085 o.data,
1086 ^following_ap_ids
1087 )
1088 )
1089 end
1090
1091 defp restrict_blocked(query, _), do: query
1092
1093 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1094 if Config.get([:activitypub, :blockers_visible]) == true do
1095 query
1096 else
1097 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1098
1099 from(
1100 activity in query,
1101 # The author doesn't block you
1102 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1103
1104 # It's not a boost of a user that blocks you
1105 where:
1106 fragment(
1107 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1108 activity.data,
1109 activity.data,
1110 ^blocker_ap_ids
1111 )
1112 )
1113 end
1114 end
1115
1116 defp restrict_blockers_visibility(query, _), do: query
1117
1118 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1119 from(
1120 activity in query,
1121 where:
1122 fragment(
1123 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1124 activity.data,
1125 ^[Constants.as_public()]
1126 )
1127 )
1128 end
1129
1130 defp restrict_unlisted(query, _), do: query
1131
1132 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1133 from(
1134 [activity, object: o] in query,
1135 where:
1136 fragment(
1137 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1138 activity.data,
1139 activity.data,
1140 activity.data,
1141 ^ids
1142 )
1143 )
1144 end
1145
1146 defp restrict_pinned(query, _), do: query
1147
1148 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1149 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1150
1151 from(
1152 activity in query,
1153 where:
1154 fragment(
1155 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1156 activity.data,
1157 activity.actor,
1158 ^muted_reblogs
1159 )
1160 )
1161 end
1162
1163 defp restrict_muted_reblogs(query, _), do: query
1164
1165 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1166 from(
1167 activity in query,
1168 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1169 )
1170 end
1171
1172 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1173 from(
1174 activity in query,
1175 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1176 )
1177 end
1178
1179 defp restrict_instance(query, _), do: query
1180
1181 defp restrict_filtered(query, %{user: %User{} = user}) do
1182 case Filter.compose_regex(user) do
1183 nil ->
1184 query
1185
1186 regex ->
1187 from([activity, object] in query,
1188 where:
1189 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1190 activity.actor == ^user.ap_id
1191 )
1192 end
1193 end
1194
1195 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1196 restrict_filtered(query, %{user: user})
1197 end
1198
1199 defp restrict_filtered(query, _), do: query
1200
1201 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1202
1203 defp exclude_poll_votes(query, _) do
1204 if has_named_binding?(query, :object) do
1205 from([activity, object: o] in query,
1206 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1207 )
1208 else
1209 query
1210 end
1211 end
1212
1213 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1214
1215 defp exclude_invisible_actors(query, _opts) do
1216 invisible_ap_ids =
1217 User.Query.build(%{invisible: true, select: [:ap_id]})
1218 |> Repo.all()
1219 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1220
1221 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1222 end
1223
1224 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1225 from(activity in query, where: activity.id != ^id)
1226 end
1227
1228 defp exclude_id(query, _), do: query
1229
1230 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1231
1232 defp maybe_preload_objects(query, _) do
1233 query
1234 |> Activity.with_preloaded_object()
1235 end
1236
1237 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1238
1239 defp maybe_preload_bookmarks(query, opts) do
1240 query
1241 |> Activity.with_preloaded_bookmark(opts[:user])
1242 end
1243
1244 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1245 query
1246 |> Activity.with_preloaded_report_notes()
1247 end
1248
1249 defp maybe_preload_report_notes(query, _), do: query
1250
1251 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1252
1253 defp maybe_set_thread_muted_field(query, opts) do
1254 query
1255 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1256 end
1257
1258 defp maybe_order(query, %{order: :desc}) do
1259 query
1260 |> order_by(desc: :id)
1261 end
1262
1263 defp maybe_order(query, %{order: :asc}) do
1264 query
1265 |> order_by(asc: :id)
1266 end
1267
1268 defp maybe_order(query, _), do: query
1269
1270 defp normalize_fetch_activities_query_opts(opts) do
1271 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1272 case opts[key] do
1273 value when is_bitstring(value) ->
1274 Map.put(opts, key, Hashtag.normalize_name(value))
1275
1276 value when is_list(value) ->
1277 normalized_value =
1278 value
1279 |> Enum.map(&Hashtag.normalize_name/1)
1280 |> Enum.uniq()
1281
1282 Map.put(opts, key, normalized_value)
1283
1284 _ ->
1285 opts
1286 end
1287 end)
1288 end
1289
1290 defp fetch_activities_query_ap_ids_ops(opts) do
1291 source_user = opts[:muting_user]
1292 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1293
1294 ap_id_relationships =
1295 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1296 [:block | ap_id_relationships]
1297 else
1298 ap_id_relationships
1299 end
1300
1301 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1302
1303 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1304 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1305
1306 restrict_muted_reblogs_opts =
1307 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1308
1309 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1310 end
1311
1312 def fetch_activities_query(recipients, opts \\ %{}) do
1313 opts = normalize_fetch_activities_query_opts(opts)
1314
1315 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1316 fetch_activities_query_ap_ids_ops(opts)
1317
1318 config = %{
1319 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1320 }
1321
1322 query =
1323 Activity
1324 |> maybe_preload_objects(opts)
1325 |> maybe_preload_bookmarks(opts)
1326 |> maybe_preload_report_notes(opts)
1327 |> maybe_set_thread_muted_field(opts)
1328 |> maybe_order(opts)
1329 |> restrict_recipients(recipients, opts[:user])
1330 |> restrict_replies(opts)
1331 |> restrict_since(opts)
1332 |> restrict_local(opts)
1333 |> restrict_remote(opts)
1334 |> restrict_actor(opts)
1335 |> restrict_type(opts)
1336 |> restrict_state(opts)
1337 |> restrict_favorited_by(opts)
1338 |> restrict_blocked(restrict_blocked_opts)
1339 |> restrict_blockers_visibility(opts)
1340 |> restrict_muted(restrict_muted_opts)
1341 |> restrict_filtered(opts)
1342 |> restrict_media(opts)
1343 |> restrict_visibility(opts)
1344 |> restrict_thread_visibility(opts, config)
1345 |> restrict_reblogs(opts)
1346 |> restrict_pinned(opts)
1347 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1348 |> restrict_instance(opts)
1349 |> restrict_announce_object_actor(opts)
1350 |> restrict_filtered(opts)
1351 |> Activity.restrict_deactivated_users()
1352 |> exclude_poll_votes(opts)
1353 |> exclude_invisible_actors(opts)
1354 |> exclude_visibility(opts)
1355
1356 if Config.feature_enabled?(:improved_hashtag_timeline) do
1357 query
1358 |> restrict_hashtag_any(opts)
1359 |> restrict_hashtag_all(opts)
1360 |> restrict_hashtag_reject_any(opts)
1361 else
1362 query
1363 |> restrict_embedded_tag_any(opts)
1364 |> restrict_embedded_tag_all(opts)
1365 |> restrict_embedded_tag_reject_any(opts)
1366 end
1367 end
1368
1369 @doc """
1370 Fetch favorites activities of user with order by sort adds to favorites
1371 """
1372 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1373 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1374 user.ap_id
1375 |> Activity.Queries.by_actor()
1376 |> Activity.Queries.by_type("Like")
1377 |> Activity.with_joined_object()
1378 |> Object.with_joined_activity()
1379 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1380 |> order_by([like, _, _], desc_nulls_last: like.id)
1381 |> Pagination.fetch_paginated(
1382 Map.merge(params, %{skip_order: true}),
1383 pagination
1384 )
1385 end
1386
1387 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1388 Enum.map(activities, fn
1389 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1390 if Enum.any?(bcc, &(&1 in list_memberships)) do
1391 update_in(activity.data["cc"], &[user_ap_id | &1])
1392 else
1393 activity
1394 end
1395
1396 activity ->
1397 activity
1398 end)
1399 end
1400
1401 defp maybe_update_cc(activities, _, _), do: activities
1402
1403 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1404 from(activity in query,
1405 where:
1406 fragment("? && ?", activity.recipients, ^recipients) or
1407 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1408 ^Constants.as_public() in activity.recipients)
1409 )
1410 end
1411
1412 def fetch_activities_bounded(
1413 recipients,
1414 recipients_with_public,
1415 opts \\ %{},
1416 pagination \\ :keyset
1417 ) do
1418 fetch_activities_query([], opts)
1419 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1420 |> Pagination.fetch_paginated(opts, pagination)
1421 |> Enum.reverse()
1422 end
1423
1424 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1425 def upload(file, opts \\ []) do
1426 with {:ok, data} <- Upload.store(file, opts) do
1427 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1428
1429 Repo.insert(%Object{data: obj_data})
1430 end
1431 end
1432
1433 @spec get_actor_url(any()) :: binary() | nil
1434 defp get_actor_url(url) when is_binary(url), do: url
1435 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1436
1437 defp get_actor_url(url) when is_list(url) do
1438 url
1439 |> List.first()
1440 |> get_actor_url()
1441 end
1442
1443 defp get_actor_url(_url), do: nil
1444
1445 defp normalize_image(%{"url" => url}) do
1446 %{
1447 "type" => "Image",
1448 "url" => [%{"href" => url}]
1449 }
1450 end
1451
1452 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1453 defp normalize_image(_), do: nil
1454
1455 defp object_to_user_data(data, additional) do
1456 fields =
1457 data
1458 |> Map.get("attachment", [])
1459 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1460 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1461
1462 emojis =
1463 data
1464 |> Map.get("tag", [])
1465 |> Enum.filter(fn
1466 %{"type" => "Emoji"} -> true
1467 _ -> false
1468 end)
1469 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1470 {String.trim(name, ":"), url}
1471 end)
1472
1473 is_locked = data["manuallyApprovesFollowers"] || false
1474 data = Transmogrifier.maybe_fix_user_object(data)
1475 is_discoverable = data["discoverable"] || false
1476 invisible = data["invisible"] || false
1477 actor_type = data["type"] || "Person"
1478
1479 featured_address = data["featured"]
1480 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1481
1482 public_key =
1483 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1484 data["publicKey"]["publicKeyPem"]
1485 end
1486
1487 shared_inbox =
1488 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1489 data["endpoints"]["sharedInbox"]
1490 end
1491
1492 # if WebFinger request was already done, we probably have acct, otherwise
1493 # we request WebFinger here
1494 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1495
1496 %{
1497 ap_id: data["id"],
1498 uri: get_actor_url(data["url"]),
1499 ap_enabled: true,
1500 banner: normalize_image(data["image"]),
1501 fields: fields,
1502 emoji: emojis,
1503 is_locked: is_locked,
1504 is_discoverable: is_discoverable,
1505 invisible: invisible,
1506 avatar: normalize_image(data["icon"]),
1507 name: data["name"],
1508 follower_address: data["followers"],
1509 following_address: data["following"],
1510 featured_address: featured_address,
1511 bio: data["summary"] || "",
1512 actor_type: actor_type,
1513 also_known_as: Map.get(data, "alsoKnownAs", []),
1514 public_key: public_key,
1515 inbox: data["inbox"],
1516 shared_inbox: shared_inbox,
1517 pinned_objects: pinned_objects,
1518 nickname: nickname
1519 }
1520 end
1521
1522 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1523 generated = "#{username}@#{URI.parse(data["id"]).host}"
1524
1525 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1526 case WebFinger.finger(generated) do
1527 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1528 _ -> generated
1529 end
1530 else
1531 generated
1532 end
1533 end
1534
1535 # nickname can be nil because of virtual actors
1536 defp generate_nickname(_), do: nil
1537
1538 def fetch_follow_information_for_user(user) do
1539 with {:ok, following_data} <-
1540 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1541 {:ok, hide_follows} <- collection_private(following_data),
1542 {:ok, followers_data} <-
1543 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1544 {:ok, hide_followers} <- collection_private(followers_data) do
1545 {:ok,
1546 %{
1547 hide_follows: hide_follows,
1548 follower_count: normalize_counter(followers_data["totalItems"]),
1549 following_count: normalize_counter(following_data["totalItems"]),
1550 hide_followers: hide_followers
1551 }}
1552 else
1553 {:error, _} = e -> e
1554 e -> {:error, e}
1555 end
1556 end
1557
1558 defp normalize_counter(counter) when is_integer(counter), do: counter
1559 defp normalize_counter(_), do: 0
1560
1561 def maybe_update_follow_information(user_data) do
1562 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1563 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1564 {_, true} <-
1565 {:collections_available,
1566 !!(user_data[:following_address] && user_data[:follower_address])},
1567 {:ok, info} <-
1568 fetch_follow_information_for_user(user_data) do
1569 info = Map.merge(user_data[:info] || %{}, info)
1570
1571 user_data
1572 |> Map.put(:info, info)
1573 else
1574 {:user_type_check, false} ->
1575 user_data
1576
1577 {:collections_available, false} ->
1578 user_data
1579
1580 {:enabled, false} ->
1581 user_data
1582
1583 e ->
1584 Logger.error(
1585 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1586 )
1587
1588 user_data
1589 end
1590 end
1591
1592 defp collection_private(%{"first" => %{"type" => type}})
1593 when type in ["CollectionPage", "OrderedCollectionPage"],
1594 do: {:ok, false}
1595
1596 defp collection_private(%{"first" => first}) do
1597 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1598 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1599 {:ok, false}
1600 else
1601 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1602 {:error, _} = e -> e
1603 e -> {:error, e}
1604 end
1605 end
1606
1607 defp collection_private(_data), do: {:ok, true}
1608
1609 def user_data_from_user_object(data, additional \\ []) do
1610 with {:ok, data} <- MRF.filter(data) do
1611 {:ok, object_to_user_data(data, additional)}
1612 else
1613 e -> {:error, e}
1614 end
1615 end
1616
1617 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1618 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1619 {:ok, data} <- user_data_from_user_object(data, additional) do
1620 {:ok, maybe_update_follow_information(data)}
1621 else
1622 # If this has been deleted, only log a debug and not an error
1623 {:error, "Object has been deleted" = e} ->
1624 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1625 {:error, e}
1626
1627 {:error, {:reject, reason} = e} ->
1628 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1629 {:error, e}
1630
1631 {:error, e} ->
1632 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1633 {:error, e}
1634 end
1635 end
1636
1637 def maybe_handle_clashing_nickname(data) do
1638 with nickname when is_binary(nickname) <- data[:nickname],
1639 %User{} = old_user <- User.get_by_nickname(nickname),
1640 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1641 Logger.info(
1642 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1643 )
1644
1645 old_user
1646 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1647 |> User.update_and_set_cache()
1648 else
1649 {:ap_id_comparison, true} ->
1650 Logger.info(
1651 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1652 )
1653
1654 _ ->
1655 nil
1656 end
1657 end
1658
1659 def pin_data_from_featured_collection(%{
1660 "type" => "OrderedCollection",
1661 "first" => first
1662 }) do
1663 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1664 page
1665 |> Map.get("orderedItems")
1666 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1667 else
1668 e ->
1669 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1670 {:ok, %{}}
1671 end
1672 end
1673
1674 def pin_data_from_featured_collection(
1675 %{
1676 "type" => type
1677 } = collection
1678 )
1679 when type in ["OrderedCollection", "Collection"] do
1680 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1681
1682 # Items can either be a map _or_ a string
1683 objects
1684 |> Map.new(fn
1685 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1686 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1687 end)
1688 end
1689
1690 def fetch_and_prepare_featured_from_ap_id(nil) do
1691 {:ok, %{}}
1692 end
1693
1694 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1695 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1696 {:ok, pin_data_from_featured_collection(data)}
1697 else
1698 e ->
1699 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1700 {:ok, %{}}
1701 end
1702 end
1703
1704 def pinned_fetch_task(nil), do: nil
1705
1706 def pinned_fetch_task(%{pinned_objects: pins}) do
1707 if Enum.all?(pins, fn {ap_id, _} ->
1708 Object.get_cached_by_ap_id(ap_id) ||
1709 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1710 end) do
1711 :ok
1712 else
1713 :error
1714 end
1715 end
1716
1717 def make_user_from_ap_id(ap_id, additional \\ []) do
1718 user = User.get_cached_by_ap_id(ap_id)
1719
1720 if user && !User.ap_enabled?(user) do
1721 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1722 else
1723 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1724 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1725
1726 if user do
1727 user
1728 |> User.remote_user_changeset(data)
1729 |> User.update_and_set_cache()
1730 else
1731 maybe_handle_clashing_nickname(data)
1732
1733 data
1734 |> User.remote_user_changeset()
1735 |> Repo.insert()
1736 |> User.set_cache()
1737 end
1738 end
1739 end
1740 end
1741
1742 def make_user_from_nickname(nickname) do
1743 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1744 WebFinger.finger(nickname) do
1745 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1746 else
1747 _e -> {:error, "No AP id in WebFinger"}
1748 end
1749 end
1750
1751 # filter out broken threads
1752 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1753 entire_thread_visible_for_user?(activity, user)
1754 end
1755
1756 # do post-processing on a specific activity
1757 def contain_activity(%Activity{} = activity, %User{} = user) do
1758 contain_broken_threads(activity, user)
1759 end
1760
1761 def fetch_direct_messages_query do
1762 Activity
1763 |> restrict_type(%{type: "Create"})
1764 |> restrict_visibility(%{visibility: "direct"})
1765 |> order_by([activity], asc: activity.id)
1766 end
1767 end