don't error out if the featured collection has a string ID
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
199 stream_out(activity)
200 stream_out_participations(participations)
201 end
202
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
205 ) do
206 with {:ok, _job} <-
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
210 }) do
211 {:ok, activity}
212 end
213 end
214
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
216
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
221 {:ok, conversation}
222 end
223 end
224
225 defp get_participations({:ok, conversation}) do
226 conversation
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
229 end
230
231 defp get_participations(_), do: []
232
233 def stream_out_participations(participations) do
234 participations =
235 participations
236 |> Repo.preload(:user)
237
238 Streamer.stream("participation", participations)
239 end
240
241 @impl true
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
245
246 last_activity_id =
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
248 user: user,
249 blocking_user: user
250 })
251
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
254 end
255 end
256 end
257
258 @impl true
259 def stream_out_participations(_, _), do: :noop
260
261 @impl true
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
264 activity
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
267 end
268
269 @impl true
270 def stream_out(_activity) do
271 :noop
272 end
273
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
277 result
278 end
279 end
280
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
287
288 create_data =
289 make_create_data(
290 %{to: to, actor: actor, published: published, context: context, object: object},
291 additional
292 )
293
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 else
305 {:quick_insert, true, activity} ->
306 {:ok, activity}
307
308 {:fake, true, activity} ->
309 {:ok, activity}
310
311 {:error, message} ->
312 Repo.rollback(message)
313 end
314 end
315
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
318 :ok
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
383 superuser
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
386 end)
387
388 {:ok, activity}
389 else
390 {:error, error} -> Repo.rollback(error)
391 end
392 end
393
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
396 params = %{
397 "type" => "Move",
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id,
401 "to" => [origin.follower_address]
402 }
403
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
408
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
412 })
413
414 {:ok, activity}
415 else
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
417 err -> err
418 end
419 end
420
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
423
424 recipients =
425 if opts[:user],
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 else: public
428
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_blockers_visibility(opts)
435 |> restrict_recipients(recipients, opts[:user])
436 |> restrict_filtered(opts)
437 |> where(
438 [activity],
439 fragment(
440 "?->>'type' = ? and ?->>'context' = ?",
441 activity.data,
442 "Create",
443 activity.data,
444 ^context
445 )
446 )
447 |> exclude_poll_votes(opts)
448 |> exclude_id(opts)
449 |> order_by([activity], desc: activity.id)
450 end
451
452 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
453 def fetch_activities_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(opts)
456 |> Repo.all()
457 end
458
459 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
460 FlakeId.Ecto.CompatType.t() | nil
461 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
462 context
463 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
464 |> restrict_visibility(%{visibility: "direct"})
465 |> limit(1)
466 |> select([a], a.id)
467 |> Repo.one()
468 end
469
470 defp fetch_paginated_optimized(query, opts, pagination) do
471 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
472 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
473 opts = Map.put(opts, :skip_extra_order, true)
474
475 Pagination.fetch_paginated(query, opts, pagination)
476 end
477
478 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
479 list_memberships = Pleroma.List.memberships(opts[:user])
480
481 fetch_activities_query(recipients ++ list_memberships, opts)
482 |> fetch_paginated_optimized(opts, pagination)
483 |> Enum.reverse()
484 |> maybe_update_cc(list_memberships, opts[:user])
485 end
486
487 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
488 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
489 opts = Map.delete(opts, :user)
490
491 [Constants.as_public()]
492 |> fetch_activities_query(opts)
493 |> restrict_unlisted(opts)
494 |> fetch_paginated_optimized(opts, pagination)
495 end
496
497 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
498 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
499 opts
500 |> Map.put(:restrict_unlisted, true)
501 |> fetch_public_or_unlisted_activities(pagination)
502 end
503
504 @valid_visibilities ~w[direct unlisted public private]
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when is_list(visibility) do
508 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
509 from(
510 a in query,
511 where:
512 fragment(
513 "activity_visibility(?, ?, ?) = ANY (?)",
514 a.actor,
515 a.recipients,
516 a.data,
517 ^visibility
518 )
519 )
520 else
521 Logger.error("Could not restrict visibility to #{visibility}")
522 end
523 end
524
525 defp restrict_visibility(query, %{visibility: visibility})
526 when visibility in @valid_visibilities do
527 from(
528 a in query,
529 where:
530 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
531 )
532 end
533
534 defp restrict_visibility(_query, %{visibility: visibility})
535 when visibility not in @valid_visibilities do
536 Logger.error("Could not restrict visibility to #{visibility}")
537 end
538
539 defp restrict_visibility(query, _visibility), do: query
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when is_list(visibility) do
543 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ANY (?)",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 else
556 Logger.error("Could not exclude visibility to #{visibility}")
557 query
558 end
559 end
560
561 defp exclude_visibility(query, %{exclude_visibilities: visibility})
562 when visibility in @valid_visibilities do
563 from(
564 a in query,
565 where:
566 not fragment(
567 "activity_visibility(?, ?, ?) = ?",
568 a.actor,
569 a.recipients,
570 a.data,
571 ^visibility
572 )
573 )
574 end
575
576 defp exclude_visibility(query, %{exclude_visibilities: visibility})
577 when visibility not in [nil | @valid_visibilities] do
578 Logger.error("Could not exclude visibility to #{visibility}")
579 query
580 end
581
582 defp exclude_visibility(query, _visibility), do: query
583
584 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
585 do: query
586
587 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
588 do: query
589
590 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
591 from(
592 a in query,
593 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
594 )
595 end
596
597 defp restrict_thread_visibility(query, _, _), do: query
598
599 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
600 params =
601 params
602 |> Map.put(:user, reading_user)
603 |> Map.put(:actor_id, user.ap_id)
604
605 %{
606 godmode: params[:godmode],
607 reading_user: reading_user
608 }
609 |> user_activities_recipients()
610 |> fetch_activities(params)
611 |> Enum.reverse()
612 end
613
614 def fetch_user_activities(user, reading_user, params \\ %{})
615
616 def fetch_user_activities(user, reading_user, %{total: true} = params) do
617 result = fetch_activities_for_user(user, reading_user, params)
618
619 Keyword.put(result, :items, Enum.reverse(result[:items]))
620 end
621
622 def fetch_user_activities(user, reading_user, params) do
623 user
624 |> fetch_activities_for_user(reading_user, params)
625 |> Enum.reverse()
626 end
627
628 defp fetch_activities_for_user(user, reading_user, params) do
629 params =
630 params
631 |> Map.put(:type, ["Create", "Announce"])
632 |> Map.put(:user, reading_user)
633 |> Map.put(:actor_id, user.ap_id)
634 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
635
636 params =
637 if User.blocks?(reading_user, user) do
638 params
639 else
640 params
641 |> Map.put(:blocking_user, reading_user)
642 |> Map.put(:muting_user, reading_user)
643 end
644
645 pagination_type = Map.get(params, :pagination_type) || :keyset
646
647 %{
648 godmode: params[:godmode],
649 reading_user: reading_user
650 }
651 |> user_activities_recipients()
652 |> fetch_activities(params, pagination_type)
653 end
654
655 def fetch_statuses(reading_user, %{total: true} = params) do
656 result = fetch_activities_for_reading_user(reading_user, params)
657 Keyword.put(result, :items, Enum.reverse(result[:items]))
658 end
659
660 def fetch_statuses(reading_user, params) do
661 reading_user
662 |> fetch_activities_for_reading_user(params)
663 |> Enum.reverse()
664 end
665
666 defp fetch_activities_for_reading_user(reading_user, params) do
667 params = Map.put(params, :type, ["Create", "Announce"])
668
669 %{
670 godmode: params[:godmode],
671 reading_user: reading_user
672 }
673 |> user_activities_recipients()
674 |> fetch_activities(params, :offset)
675 end
676
677 defp user_activities_recipients(%{godmode: true}), do: []
678
679 defp user_activities_recipients(%{reading_user: reading_user}) do
680 if reading_user do
681 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
682 else
683 [Constants.as_public()]
684 end
685 end
686
687 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
688 raise "Can't use the child object without preloading!"
689 end
690
691 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
692 from(
693 [activity, object] in query,
694 where:
695 fragment(
696 "?->>'type' != ? or ?->>'actor' != ?",
697 activity.data,
698 "Announce",
699 object.data,
700 ^actor
701 )
702 )
703 end
704
705 defp restrict_announce_object_actor(query, _), do: query
706
707 defp restrict_since(query, %{since_id: ""}), do: query
708
709 defp restrict_since(query, %{since_id: since_id}) do
710 from(activity in query, where: activity.id > ^since_id)
711 end
712
713 defp restrict_since(query, _), do: query
714
715 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
716 raise_on_missing_preload()
717 end
718
719 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
720 from(
721 [_activity, object] in query,
722 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
723 )
724 end
725
726 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
727 restrict_embedded_tag_any(query, %{tag: tag})
728 end
729
730 defp restrict_embedded_tag_all(query, _), do: query
731
732 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
733 raise_on_missing_preload()
734 end
735
736 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
737 from(
738 [_activity, object] in query,
739 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
740 )
741 end
742
743 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
744 restrict_embedded_tag_any(query, %{tag: [tag]})
745 end
746
747 defp restrict_embedded_tag_any(query, _), do: query
748
749 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
750 raise_on_missing_preload()
751 end
752
753 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
754 from(
755 [_activity, object] in query,
756 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
757 )
758 end
759
760 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
761 when is_binary(tag_reject) do
762 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
763 end
764
765 defp restrict_embedded_tag_reject_any(query, _), do: query
766
767 defp object_ids_query_for_tags(tags) do
768 from(hto in "hashtags_objects")
769 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
770 |> where([hto, ht], ht.name in ^tags)
771 |> select([hto], hto.object_id)
772 |> distinct([hto], true)
773 end
774
775 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
777 end
778
779 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
780 restrict_hashtag_any(query, %{tag: single_tag})
781 end
782
783 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
784 from(
785 [_activity, object] in query,
786 where:
787 fragment(
788 """
789 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
790 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
791 AND hashtags_objects.object_id = ?) @> ?
792 """,
793 ^tags,
794 object.id,
795 ^tags
796 )
797 )
798 end
799
800 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
801 restrict_hashtag_all(query, %{tag_all: [tag]})
802 end
803
804 defp restrict_hashtag_all(query, _), do: query
805
806 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
807 raise_on_missing_preload()
808 end
809
810 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
811 hashtag_ids =
812 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
813 |> Repo.all()
814
815 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
816 from(
817 [_activity, object] in query,
818 join: hto in "hashtags_objects",
819 on: hto.object_id == object.id,
820 where: hto.hashtag_id in ^hashtag_ids,
821 distinct: [desc: object.id],
822 order_by: [desc: object.id]
823 )
824 end
825
826 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
827 restrict_hashtag_any(query, %{tag: [tag]})
828 end
829
830 defp restrict_hashtag_any(query, _), do: query
831
832 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
833 raise_on_missing_preload()
834 end
835
836 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
837 from(
838 [_activity, object] in query,
839 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
840 )
841 end
842
843 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
844 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
845 end
846
847 defp restrict_hashtag_reject_any(query, _), do: query
848
849 defp raise_on_missing_preload do
850 raise "Can't use the child object without preloading!"
851 end
852
853 defp restrict_recipients(query, [], _user), do: query
854
855 defp restrict_recipients(query, recipients, nil) do
856 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
857 end
858
859 defp restrict_recipients(query, recipients, user) do
860 from(
861 activity in query,
862 where: fragment("? && ?", ^recipients, activity.recipients),
863 or_where: activity.actor == ^user.ap_id
864 )
865 end
866
867 defp restrict_local(query, %{local_only: true}) do
868 from(activity in query, where: activity.local == true)
869 end
870
871 defp restrict_local(query, _), do: query
872
873 defp restrict_remote(query, %{remote: true}) do
874 from(activity in query, where: activity.local == false)
875 end
876
877 defp restrict_remote(query, _), do: query
878
879 defp restrict_actor(query, %{actor_id: actor_id}) do
880 from(activity in query, where: activity.actor == ^actor_id)
881 end
882
883 defp restrict_actor(query, _), do: query
884
885 defp restrict_type(query, %{type: type}) when is_binary(type) do
886 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
887 end
888
889 defp restrict_type(query, %{type: type}) do
890 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
891 end
892
893 defp restrict_type(query, _), do: query
894
895 defp restrict_state(query, %{state: state}) do
896 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
897 end
898
899 defp restrict_state(query, _), do: query
900
901 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
902 from(
903 [_activity, object] in query,
904 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
905 )
906 end
907
908 defp restrict_favorited_by(query, _), do: query
909
910 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
911 raise "Can't use the child object without preloading!"
912 end
913
914 defp restrict_media(query, %{only_media: true}) do
915 from(
916 [activity, object] in query,
917 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
918 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
919 )
920 end
921
922 defp restrict_media(query, _), do: query
923
924 defp restrict_replies(query, %{exclude_replies: true}) do
925 from(
926 [_activity, object] in query,
927 where: fragment("?->>'inReplyTo' is null", object.data)
928 )
929 end
930
931 defp restrict_replies(query, %{
932 reply_filtering_user: %User{} = user,
933 reply_visibility: "self"
934 }) do
935 from(
936 [activity, object] in query,
937 where:
938 fragment(
939 "?->>'inReplyTo' is null OR ? = ANY(?)",
940 object.data,
941 ^user.ap_id,
942 activity.recipients
943 )
944 )
945 end
946
947 defp restrict_replies(query, %{
948 reply_filtering_user: %User{} = user,
949 reply_visibility: "following"
950 }) do
951 from(
952 [activity, object] in query,
953 where:
954 fragment(
955 """
956 ?->>'type' != 'Create' -- This isn't a Create
957 OR ?->>'inReplyTo' is null -- this isn't a reply
958 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
959 -- unless they are the author (because authors
960 -- are also part of the recipients). This leads
961 -- to a bug that self-replies by friends won't
962 -- show up.
963 OR ? = ? -- The actor is us
964 """,
965 activity.data,
966 object.data,
967 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
968 activity.recipients,
969 activity.actor,
970 activity.actor,
971 ^user.ap_id
972 )
973 )
974 end
975
976 defp restrict_replies(query, _), do: query
977
978 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
979 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
980 end
981
982 defp restrict_reblogs(query, _), do: query
983
984 defp restrict_muted(query, %{with_muted: true}), do: query
985
986 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
987 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
988
989 query =
990 from([activity] in query,
991 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
992 where:
993 fragment(
994 "not (?->'to' \\?| ?) or ? = ?",
995 activity.data,
996 ^mutes,
997 activity.actor,
998 ^user.ap_id
999 )
1000 )
1001
1002 unless opts[:skip_preload] do
1003 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1004 else
1005 query
1006 end
1007 end
1008
1009 defp restrict_muted(query, _), do: query
1010
1011 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1012 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1013 domain_blocks = user.domain_blocks || []
1014
1015 following_ap_ids = User.get_friends_ap_ids(user)
1016
1017 query =
1018 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1019
1020 from(
1021 [activity, object: o] in query,
1022 # You don't block the author
1023 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1024
1025 # You don't block any recipients, and didn't author the post
1026 where:
1027 fragment(
1028 "((not (? && ?)) or ? = ?)",
1029 activity.recipients,
1030 ^blocked_ap_ids,
1031 activity.actor,
1032 ^user.ap_id
1033 ),
1034
1035 # You don't block the domain of any recipients, and didn't author the post
1036 where:
1037 fragment(
1038 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1039 activity.recipients,
1040 ^domain_blocks,
1041 activity.actor,
1042 ^user.ap_id
1043 ),
1044
1045 # It's not a boost of a user you block
1046 where:
1047 fragment(
1048 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1049 activity.data,
1050 activity.data,
1051 ^blocked_ap_ids
1052 ),
1053
1054 # You don't block the author's domain, and also don't follow the author
1055 where:
1056 fragment(
1057 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1058 activity.actor,
1059 ^domain_blocks,
1060 activity.actor,
1061 ^following_ap_ids
1062 ),
1063
1064 # Same as above, but checks the Object
1065 where:
1066 fragment(
1067 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1068 o.data,
1069 ^domain_blocks,
1070 o.data,
1071 ^following_ap_ids
1072 )
1073 )
1074 end
1075
1076 defp restrict_blocked(query, _), do: query
1077
1078 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1079 if Config.get([:activitypub, :blockers_visible]) == true do
1080 query
1081 else
1082 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1083
1084 from(
1085 activity in query,
1086 # The author doesn't block you
1087 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1088
1089 # It's not a boost of a user that blocks you
1090 where:
1091 fragment(
1092 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1093 activity.data,
1094 activity.data,
1095 ^blocker_ap_ids
1096 )
1097 )
1098 end
1099 end
1100
1101 defp restrict_blockers_visibility(query, _), do: query
1102
1103 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1104 from(
1105 activity in query,
1106 where:
1107 fragment(
1108 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1109 activity.data,
1110 ^[Constants.as_public()]
1111 )
1112 )
1113 end
1114
1115 defp restrict_unlisted(query, _), do: query
1116
1117 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1118 from(
1119 [activity, object: o] in query,
1120 where:
1121 fragment(
1122 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1123 activity.data,
1124 activity.data,
1125 activity.data,
1126 ^ids
1127 )
1128 )
1129 end
1130
1131 defp restrict_pinned(query, _), do: query
1132
1133 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1134 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1135
1136 from(
1137 activity in query,
1138 where:
1139 fragment(
1140 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1141 activity.data,
1142 activity.actor,
1143 ^muted_reblogs
1144 )
1145 )
1146 end
1147
1148 defp restrict_muted_reblogs(query, _), do: query
1149
1150 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1151 from(
1152 activity in query,
1153 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1154 )
1155 end
1156
1157 defp restrict_instance(query, %{instance: instance}) when is_list(instance) do
1158 from(
1159 activity in query,
1160 where: fragment("split_part(actor::text, '/'::text, 3) = ANY(?)", ^instance)
1161 )
1162 end
1163
1164 defp restrict_instance(query, _), do: query
1165
1166 defp restrict_filtered(query, %{user: %User{} = user}) do
1167 case Filter.compose_regex(user) do
1168 nil ->
1169 query
1170
1171 regex ->
1172 from([activity, object] in query,
1173 where:
1174 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1175 activity.actor == ^user.ap_id
1176 )
1177 end
1178 end
1179
1180 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1181 restrict_filtered(query, %{user: user})
1182 end
1183
1184 defp restrict_filtered(query, _), do: query
1185
1186 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1187
1188 defp exclude_poll_votes(query, _) do
1189 if has_named_binding?(query, :object) do
1190 from([activity, object: o] in query,
1191 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1192 )
1193 else
1194 query
1195 end
1196 end
1197
1198 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1199
1200 defp exclude_invisible_actors(query, _opts) do
1201 invisible_ap_ids =
1202 User.Query.build(%{invisible: true, select: [:ap_id]})
1203 |> Repo.all()
1204 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1205
1206 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1207 end
1208
1209 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1210 from(activity in query, where: activity.id != ^id)
1211 end
1212
1213 defp exclude_id(query, _), do: query
1214
1215 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1216
1217 defp maybe_preload_objects(query, _) do
1218 query
1219 |> Activity.with_preloaded_object()
1220 end
1221
1222 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1223
1224 defp maybe_preload_bookmarks(query, opts) do
1225 query
1226 |> Activity.with_preloaded_bookmark(opts[:user])
1227 end
1228
1229 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1230 query
1231 |> Activity.with_preloaded_report_notes()
1232 end
1233
1234 defp maybe_preload_report_notes(query, _), do: query
1235
1236 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1237
1238 defp maybe_set_thread_muted_field(query, opts) do
1239 query
1240 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1241 end
1242
1243 defp maybe_order(query, %{order: :desc}) do
1244 query
1245 |> order_by(desc: :id)
1246 end
1247
1248 defp maybe_order(query, %{order: :asc}) do
1249 query
1250 |> order_by(asc: :id)
1251 end
1252
1253 defp maybe_order(query, _), do: query
1254
1255 defp normalize_fetch_activities_query_opts(opts) do
1256 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1257 case opts[key] do
1258 value when is_bitstring(value) ->
1259 Map.put(opts, key, Hashtag.normalize_name(value))
1260
1261 value when is_list(value) ->
1262 normalized_value =
1263 value
1264 |> Enum.map(&Hashtag.normalize_name/1)
1265 |> Enum.uniq()
1266
1267 Map.put(opts, key, normalized_value)
1268
1269 _ ->
1270 opts
1271 end
1272 end)
1273 end
1274
1275 defp fetch_activities_query_ap_ids_ops(opts) do
1276 source_user = opts[:muting_user]
1277 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1278
1279 ap_id_relationships =
1280 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1281 [:block | ap_id_relationships]
1282 else
1283 ap_id_relationships
1284 end
1285
1286 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1287
1288 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1289 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1290
1291 restrict_muted_reblogs_opts =
1292 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1293
1294 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1295 end
1296
1297 def fetch_activities_query(recipients, opts \\ %{}) do
1298 opts = normalize_fetch_activities_query_opts(opts)
1299
1300 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1301 fetch_activities_query_ap_ids_ops(opts)
1302
1303 config = %{
1304 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1305 }
1306
1307 query =
1308 Activity
1309 |> maybe_preload_objects(opts)
1310 |> maybe_preload_bookmarks(opts)
1311 |> maybe_preload_report_notes(opts)
1312 |> maybe_set_thread_muted_field(opts)
1313 |> maybe_order(opts)
1314 |> restrict_recipients(recipients, opts[:user])
1315 |> restrict_replies(opts)
1316 |> restrict_since(opts)
1317 |> restrict_local(opts)
1318 |> restrict_remote(opts)
1319 |> restrict_actor(opts)
1320 |> restrict_type(opts)
1321 |> restrict_state(opts)
1322 |> restrict_favorited_by(opts)
1323 |> restrict_blocked(restrict_blocked_opts)
1324 |> restrict_blockers_visibility(opts)
1325 |> restrict_muted(restrict_muted_opts)
1326 |> restrict_filtered(opts)
1327 |> restrict_media(opts)
1328 |> restrict_visibility(opts)
1329 |> restrict_thread_visibility(opts, config)
1330 |> restrict_reblogs(opts)
1331 |> restrict_pinned(opts)
1332 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1333 |> restrict_instance(opts)
1334 |> restrict_announce_object_actor(opts)
1335 |> restrict_filtered(opts)
1336 |> Activity.restrict_deactivated_users()
1337 |> exclude_poll_votes(opts)
1338 |> exclude_invisible_actors(opts)
1339 |> exclude_visibility(opts)
1340
1341 if Config.feature_enabled?(:improved_hashtag_timeline) do
1342 query
1343 |> restrict_hashtag_any(opts)
1344 |> restrict_hashtag_all(opts)
1345 |> restrict_hashtag_reject_any(opts)
1346 else
1347 query
1348 |> restrict_embedded_tag_any(opts)
1349 |> restrict_embedded_tag_all(opts)
1350 |> restrict_embedded_tag_reject_any(opts)
1351 end
1352 end
1353
1354 @doc """
1355 Fetch favorites activities of user with order by sort adds to favorites
1356 """
1357 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1358 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1359 user.ap_id
1360 |> Activity.Queries.by_actor()
1361 |> Activity.Queries.by_type("Like")
1362 |> Activity.with_joined_object()
1363 |> Object.with_joined_activity()
1364 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1365 |> order_by([like, _, _], desc_nulls_last: like.id)
1366 |> Pagination.fetch_paginated(
1367 Map.merge(params, %{skip_order: true}),
1368 pagination
1369 )
1370 end
1371
1372 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1373 Enum.map(activities, fn
1374 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1375 if Enum.any?(bcc, &(&1 in list_memberships)) do
1376 update_in(activity.data["cc"], &[user_ap_id | &1])
1377 else
1378 activity
1379 end
1380
1381 activity ->
1382 activity
1383 end)
1384 end
1385
1386 defp maybe_update_cc(activities, _, _), do: activities
1387
1388 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1389 from(activity in query,
1390 where:
1391 fragment("? && ?", activity.recipients, ^recipients) or
1392 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1393 ^Constants.as_public() in activity.recipients)
1394 )
1395 end
1396
1397 def fetch_activities_bounded(
1398 recipients,
1399 recipients_with_public,
1400 opts \\ %{},
1401 pagination \\ :keyset
1402 ) do
1403 fetch_activities_query([], opts)
1404 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1405 |> Pagination.fetch_paginated(opts, pagination)
1406 |> Enum.reverse()
1407 end
1408
1409 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1410 def upload(file, opts \\ []) do
1411 with {:ok, data} <- Upload.store(file, opts) do
1412 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1413
1414 Repo.insert(%Object{data: obj_data})
1415 end
1416 end
1417
1418 @spec get_actor_url(any()) :: binary() | nil
1419 defp get_actor_url(url) when is_binary(url), do: url
1420 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1421
1422 defp get_actor_url(url) when is_list(url) do
1423 url
1424 |> List.first()
1425 |> get_actor_url()
1426 end
1427
1428 defp get_actor_url(_url), do: nil
1429
1430 defp normalize_image(%{"url" => url}) do
1431 %{
1432 "type" => "Image",
1433 "url" => [%{"href" => url}]
1434 }
1435 end
1436
1437 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1438 defp normalize_image(_), do: nil
1439
1440 defp object_to_user_data(data) do
1441 fields =
1442 data
1443 |> Map.get("attachment", [])
1444 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1445 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1446
1447 emojis =
1448 data
1449 |> Map.get("tag", [])
1450 |> Enum.filter(fn
1451 %{"type" => "Emoji"} -> true
1452 _ -> false
1453 end)
1454 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1455 {String.trim(name, ":"), url}
1456 end)
1457
1458 is_locked = data["manuallyApprovesFollowers"] || false
1459 data = Transmogrifier.maybe_fix_user_object(data)
1460 is_discoverable = data["discoverable"] || false
1461 invisible = data["invisible"] || false
1462 actor_type = data["type"] || "Person"
1463
1464 featured_address = data["featured"]
1465 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1466
1467 public_key =
1468 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1469 data["publicKey"]["publicKeyPem"]
1470 else
1471 nil
1472 end
1473
1474 shared_inbox =
1475 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1476 data["endpoints"]["sharedInbox"]
1477 else
1478 nil
1479 end
1480
1481 user_data = %{
1482 ap_id: data["id"],
1483 uri: get_actor_url(data["url"]),
1484 ap_enabled: true,
1485 banner: normalize_image(data["image"]),
1486 fields: fields,
1487 emoji: emojis,
1488 is_locked: is_locked,
1489 is_discoverable: is_discoverable,
1490 invisible: invisible,
1491 avatar: normalize_image(data["icon"]),
1492 name: data["name"],
1493 follower_address: data["followers"],
1494 following_address: data["following"],
1495 featured_address: featured_address,
1496 bio: data["summary"] || "",
1497 actor_type: actor_type,
1498 also_known_as: Map.get(data, "alsoKnownAs", []),
1499 public_key: public_key,
1500 inbox: data["inbox"],
1501 shared_inbox: shared_inbox,
1502 pinned_objects: pinned_objects
1503 }
1504
1505 # nickname can be nil because of virtual actors
1506 if data["preferredUsername"] do
1507 Map.put(
1508 user_data,
1509 :nickname,
1510 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1511 )
1512 else
1513 Map.put(user_data, :nickname, nil)
1514 end
1515 end
1516
1517 def fetch_follow_information_for_user(user) do
1518 with {:ok, following_data} <-
1519 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1520 {:ok, hide_follows} <- collection_private(following_data),
1521 {:ok, followers_data} <-
1522 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1523 {:ok, hide_followers} <- collection_private(followers_data) do
1524 {:ok,
1525 %{
1526 hide_follows: hide_follows,
1527 follower_count: normalize_counter(followers_data["totalItems"]),
1528 following_count: normalize_counter(following_data["totalItems"]),
1529 hide_followers: hide_followers
1530 }}
1531 else
1532 {:error, _} = e -> e
1533 e -> {:error, e}
1534 end
1535 end
1536
1537 defp normalize_counter(counter) when is_integer(counter), do: counter
1538 defp normalize_counter(_), do: 0
1539
1540 def maybe_update_follow_information(user_data) do
1541 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1542 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1543 {_, true} <-
1544 {:collections_available,
1545 !!(user_data[:following_address] && user_data[:follower_address])},
1546 {:ok, info} <-
1547 fetch_follow_information_for_user(user_data) do
1548 info = Map.merge(user_data[:info] || %{}, info)
1549
1550 user_data
1551 |> Map.put(:info, info)
1552 else
1553 {:user_type_check, false} ->
1554 user_data
1555
1556 {:collections_available, false} ->
1557 user_data
1558
1559 {:enabled, false} ->
1560 user_data
1561
1562 e ->
1563 Logger.error(
1564 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1565 )
1566
1567 user_data
1568 end
1569 end
1570
1571 defp collection_private(%{"first" => %{"type" => type}})
1572 when type in ["CollectionPage", "OrderedCollectionPage"],
1573 do: {:ok, false}
1574
1575 defp collection_private(%{"first" => first}) do
1576 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1577 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1578 {:ok, false}
1579 else
1580 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1581 {:error, _} = e -> e
1582 e -> {:error, e}
1583 end
1584 end
1585
1586 defp collection_private(_data), do: {:ok, true}
1587
1588 def user_data_from_user_object(data) do
1589 with {:ok, data} <- MRF.filter(data) do
1590 {:ok, object_to_user_data(data)}
1591 else
1592 e -> {:error, e}
1593 end
1594 end
1595
1596 def fetch_and_prepare_user_from_ap_id(ap_id) do
1597 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1598 {:ok, data} <- user_data_from_user_object(data) do
1599 {:ok, maybe_update_follow_information(data)}
1600 else
1601 # If this has been deleted, only log a debug and not an error
1602 {:error, "Object has been deleted" = e} ->
1603 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1604 {:error, e}
1605
1606 {:error, {:reject, reason} = e} ->
1607 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1608 {:error, e}
1609
1610 {:error, e} ->
1611 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1612 {:error, e}
1613 end
1614 end
1615
1616 def maybe_handle_clashing_nickname(data) do
1617 with nickname when is_binary(nickname) <- data[:nickname],
1618 %User{} = old_user <- User.get_by_nickname(nickname),
1619 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1620 Logger.info(
1621 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1622 )
1623
1624 old_user
1625 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1626 |> User.update_and_set_cache()
1627 else
1628 {:ap_id_comparison, true} ->
1629 Logger.info(
1630 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1631 )
1632
1633 _ ->
1634 nil
1635 end
1636 end
1637
1638 def pin_data_from_featured_collection(%{
1639 "type" => "OrderedCollection",
1640 "first" => first
1641 }) do
1642 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1643 page
1644 |> Map.get("orderedItems")
1645 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1646 else
1647 e ->
1648 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1649 {:ok, %{}}
1650 end
1651 end
1652
1653 def pin_data_from_featured_collection(
1654 %{
1655 "type" => type
1656 } = collection
1657 )
1658 when type in ["OrderedCollection", "Collection"] do
1659 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1660
1661 # Items can either be a map _or_ a string
1662 objects
1663 |> Map.new(fn
1664 ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
1665 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1666 end)
1667 end
1668
1669 def fetch_and_prepare_featured_from_ap_id(nil) do
1670 {:ok, %{}}
1671 end
1672
1673 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1674 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1675 {:ok, pin_data_from_featured_collection(data)}
1676 else
1677 e ->
1678 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1679 {:ok, %{}}
1680 end
1681 end
1682
1683 def pinned_fetch_task(nil), do: nil
1684
1685 def pinned_fetch_task(%{pinned_objects: pins}) do
1686 if Enum.all?(pins, fn {ap_id, _} ->
1687 Object.get_cached_by_ap_id(ap_id) ||
1688 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1689 end) do
1690 :ok
1691 else
1692 :error
1693 end
1694 end
1695
1696 def make_user_from_ap_id(ap_id) do
1697 user = User.get_cached_by_ap_id(ap_id)
1698
1699 if user && !User.ap_enabled?(user) do
1700 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1701 else
1702 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1703 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1704
1705 if user do
1706 user
1707 |> User.remote_user_changeset(data)
1708 |> User.update_and_set_cache()
1709 else
1710 maybe_handle_clashing_nickname(data)
1711
1712 data
1713 |> User.remote_user_changeset()
1714 |> Repo.insert()
1715 |> User.set_cache()
1716 end
1717 end
1718 end
1719 end
1720
1721 def make_user_from_nickname(nickname) do
1722 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1723 make_user_from_ap_id(ap_id)
1724 else
1725 _e -> {:error, "No AP id in WebFinger"}
1726 end
1727 end
1728
1729 # filter out broken threads
1730 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1731 entire_thread_visible_for_user?(activity, user)
1732 end
1733
1734 # do post-processing on a specific activity
1735 def contain_activity(%Activity{} = activity, %User{} = user) do
1736 contain_broken_threads(activity, user)
1737 end
1738
1739 def fetch_direct_messages_query do
1740 Activity
1741 |> restrict_type(%{type: "Create"})
1742 |> restrict_visibility(%{visibility: "direct"})
1743 |> order_by([activity], asc: activity.id)
1744 end
1745 end