purge chat and shout endpoints
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
199 stream_out(activity)
200 stream_out_participations(participations)
201 end
202
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
205 ) do
206 with {:ok, _job} <-
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
210 }) do
211 {:ok, activity}
212 end
213 end
214
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
216
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
221 {:ok, conversation}
222 end
223 end
224
225 defp get_participations({:ok, conversation}) do
226 conversation
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
229 end
230
231 defp get_participations(_), do: []
232
233 def stream_out_participations(participations) do
234 participations =
235 participations
236 |> Repo.preload(:user)
237
238 Streamer.stream("participation", participations)
239 end
240
241 @impl true
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
245
246 last_activity_id =
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
248 user: user,
249 blocking_user: user
250 })
251
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
254 end
255 end
256 end
257
258 @impl true
259 def stream_out_participations(_, _), do: :noop
260
261 @impl true
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
264 activity
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
267 end
268
269 @impl true
270 def stream_out(_activity) do
271 :noop
272 end
273
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
277 result
278 end
279 end
280
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
287
288 create_data =
289 make_create_data(
290 %{to: to, actor: actor, published: published, context: context, object: object},
291 additional
292 )
293
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 else
305 {:quick_insert, true, activity} ->
306 {:ok, activity}
307
308 {:fake, true, activity} ->
309 {:ok, activity}
310
311 {:error, message} ->
312 Repo.rollback(message)
313 end
314 end
315
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
318 :ok
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
383 superuser
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
386 end)
387
388 {:ok, activity}
389 else
390 {:error, error} -> Repo.rollback(error)
391 end
392 end
393
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
396 params = %{
397 "type" => "Move",
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id,
401 "to" => [origin.follower_address]
402 }
403
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
408
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
412 })
413
414 {:ok, activity}
415 else
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
417 err -> err
418 end
419 end
420
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
423
424 recipients =
425 if opts[:user],
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 else: public
428
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_blockers_visibility(opts)
435 |> restrict_recipients(recipients, opts[:user])
436 |> restrict_filtered(opts)
437 |> where(
438 [activity],
439 fragment(
440 "?->>'type' = ? and ?->>'context' = ?",
441 activity.data,
442 "Create",
443 activity.data,
444 ^context
445 )
446 )
447 |> exclude_poll_votes(opts)
448 |> exclude_id(opts)
449 |> order_by([activity], desc: activity.id)
450 end
451
452 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
453 def fetch_activities_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(opts)
456 |> Repo.all()
457 end
458
459 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
460 FlakeId.Ecto.CompatType.t() | nil
461 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
462 context
463 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
464 |> restrict_visibility(%{visibility: "direct"})
465 |> limit(1)
466 |> select([a], a.id)
467 |> Repo.one()
468 end
469
470 defp fetch_paginated_optimized(query, opts, pagination) do
471 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
472 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
473 opts = Map.put(opts, :skip_extra_order, true)
474
475 Pagination.fetch_paginated(query, opts, pagination)
476 end
477
478 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
479 list_memberships = Pleroma.List.memberships(opts[:user])
480
481 fetch_activities_query(recipients ++ list_memberships, opts)
482 |> fetch_paginated_optimized(opts, pagination)
483 |> Enum.reverse()
484 |> maybe_update_cc(list_memberships, opts[:user])
485 end
486
487 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
488 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
489 opts = Map.delete(opts, :user)
490
491 [Constants.as_public()]
492 |> fetch_activities_query(opts)
493 |> restrict_unlisted(opts)
494 |> fetch_paginated_optimized(opts, pagination)
495 end
496
497 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
498 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
499 opts
500 |> Map.put(:restrict_unlisted, true)
501 |> fetch_public_or_unlisted_activities(pagination)
502 end
503
504 @valid_visibilities ~w[direct unlisted public private]
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when is_list(visibility) do
508 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
509 from(
510 a in query,
511 where:
512 fragment(
513 "activity_visibility(?, ?, ?) = ANY (?)",
514 a.actor,
515 a.recipients,
516 a.data,
517 ^visibility
518 )
519 )
520 else
521 Logger.error("Could not restrict visibility to #{visibility}")
522 end
523 end
524
525 defp restrict_visibility(query, %{visibility: visibility})
526 when visibility in @valid_visibilities do
527 from(
528 a in query,
529 where:
530 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
531 )
532 end
533
534 defp restrict_visibility(_query, %{visibility: visibility})
535 when visibility not in @valid_visibilities do
536 Logger.error("Could not restrict visibility to #{visibility}")
537 end
538
539 defp restrict_visibility(query, _visibility), do: query
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when is_list(visibility) do
543 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ANY (?)",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 else
556 Logger.error("Could not exclude visibility to #{visibility}")
557 query
558 end
559 end
560
561 defp exclude_visibility(query, %{exclude_visibilities: visibility})
562 when visibility in @valid_visibilities do
563 from(
564 a in query,
565 where:
566 not fragment(
567 "activity_visibility(?, ?, ?) = ?",
568 a.actor,
569 a.recipients,
570 a.data,
571 ^visibility
572 )
573 )
574 end
575
576 defp exclude_visibility(query, %{exclude_visibilities: visibility})
577 when visibility not in [nil | @valid_visibilities] do
578 Logger.error("Could not exclude visibility to #{visibility}")
579 query
580 end
581
582 defp exclude_visibility(query, _visibility), do: query
583
584 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
585 do: query
586
587 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
588 do: query
589
590 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
591 from(
592 a in query,
593 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
594 )
595 end
596
597 defp restrict_thread_visibility(query, _, _), do: query
598
599 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
600 params =
601 params
602 |> Map.put(:user, reading_user)
603 |> Map.put(:actor_id, user.ap_id)
604
605 %{
606 godmode: params[:godmode],
607 reading_user: reading_user
608 }
609 |> user_activities_recipients()
610 |> fetch_activities(params)
611 |> Enum.reverse()
612 end
613
614 def fetch_user_activities(user, reading_user, params \\ %{})
615
616 def fetch_user_activities(user, reading_user, %{total: true} = params) do
617 result = fetch_activities_for_user(user, reading_user, params)
618
619 Keyword.put(result, :items, Enum.reverse(result[:items]))
620 end
621
622 def fetch_user_activities(user, reading_user, params) do
623 user
624 |> fetch_activities_for_user(reading_user, params)
625 |> Enum.reverse()
626 end
627
628 defp fetch_activities_for_user(user, reading_user, params) do
629 params =
630 params
631 |> Map.put(:type, ["Create", "Announce"])
632 |> Map.put(:user, reading_user)
633 |> Map.put(:actor_id, user.ap_id)
634 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
635
636 params =
637 if User.blocks?(reading_user, user) do
638 params
639 else
640 params
641 |> Map.put(:blocking_user, reading_user)
642 |> Map.put(:muting_user, reading_user)
643 end
644
645 pagination_type = Map.get(params, :pagination_type) || :keyset
646
647 %{
648 godmode: params[:godmode],
649 reading_user: reading_user
650 }
651 |> user_activities_recipients()
652 |> fetch_activities(params, pagination_type)
653 end
654
655 def fetch_statuses(reading_user, %{total: true} = params) do
656 result = fetch_activities_for_reading_user(reading_user, params)
657 Keyword.put(result, :items, Enum.reverse(result[:items]))
658 end
659
660 def fetch_statuses(reading_user, params) do
661 reading_user
662 |> fetch_activities_for_reading_user(params)
663 |> Enum.reverse()
664 end
665
666 defp fetch_activities_for_reading_user(reading_user, params) do
667 params = Map.put(params, :type, ["Create", "Announce"])
668
669 %{
670 godmode: params[:godmode],
671 reading_user: reading_user
672 }
673 |> user_activities_recipients()
674 |> fetch_activities(params, :offset)
675 end
676
677 defp user_activities_recipients(%{godmode: true}), do: []
678
679 defp user_activities_recipients(%{reading_user: reading_user}) do
680 if reading_user do
681 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
682 else
683 [Constants.as_public()]
684 end
685 end
686
687 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
688 raise "Can't use the child object without preloading!"
689 end
690
691 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
692 from(
693 [activity, object] in query,
694 where:
695 fragment(
696 "?->>'type' != ? or ?->>'actor' != ?",
697 activity.data,
698 "Announce",
699 object.data,
700 ^actor
701 )
702 )
703 end
704
705 defp restrict_announce_object_actor(query, _), do: query
706
707 defp restrict_since(query, %{since_id: ""}), do: query
708
709 defp restrict_since(query, %{since_id: since_id}) do
710 from(activity in query, where: activity.id > ^since_id)
711 end
712
713 defp restrict_since(query, _), do: query
714
715 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
716 raise_on_missing_preload()
717 end
718
719 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
720 from(
721 [_activity, object] in query,
722 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
723 )
724 end
725
726 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
727 restrict_embedded_tag_any(query, %{tag: tag})
728 end
729
730 defp restrict_embedded_tag_all(query, _), do: query
731
732 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
733 raise_on_missing_preload()
734 end
735
736 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
737 from(
738 [_activity, object] in query,
739 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
740 )
741 end
742
743 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
744 restrict_embedded_tag_any(query, %{tag: [tag]})
745 end
746
747 defp restrict_embedded_tag_any(query, _), do: query
748
749 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
750 raise_on_missing_preload()
751 end
752
753 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
754 from(
755 [_activity, object] in query,
756 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
757 )
758 end
759
760 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
761 when is_binary(tag_reject) do
762 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
763 end
764
765 defp restrict_embedded_tag_reject_any(query, _), do: query
766
767 defp object_ids_query_for_tags(tags) do
768 from(hto in "hashtags_objects")
769 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
770 |> where([hto, ht], ht.name in ^tags)
771 |> select([hto], hto.object_id)
772 |> distinct([hto], true)
773 end
774
775 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
777 end
778
779 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
780 restrict_hashtag_any(query, %{tag: single_tag})
781 end
782
783 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
784 from(
785 [_activity, object] in query,
786 where:
787 fragment(
788 """
789 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
790 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
791 AND hashtags_objects.object_id = ?) @> ?
792 """,
793 ^tags,
794 object.id,
795 ^tags
796 )
797 )
798 end
799
800 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
801 restrict_hashtag_all(query, %{tag_all: [tag]})
802 end
803
804 defp restrict_hashtag_all(query, _), do: query
805
806 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
807 raise_on_missing_preload()
808 end
809
810 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
811 hashtag_ids =
812 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
813 |> Repo.all()
814
815 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
816 from(
817 [_activity, object] in query,
818 join: hto in "hashtags_objects",
819 on: hto.object_id == object.id,
820 where: hto.hashtag_id in ^hashtag_ids,
821 distinct: [desc: object.id],
822 order_by: [desc: object.id]
823 )
824 end
825
826 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
827 restrict_hashtag_any(query, %{tag: [tag]})
828 end
829
830 defp restrict_hashtag_any(query, _), do: query
831
832 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
833 raise_on_missing_preload()
834 end
835
836 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
837 from(
838 [_activity, object] in query,
839 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
840 )
841 end
842
843 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
844 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
845 end
846
847 defp restrict_hashtag_reject_any(query, _), do: query
848
849 defp raise_on_missing_preload do
850 raise "Can't use the child object without preloading!"
851 end
852
853 defp restrict_recipients(query, [], _user), do: query
854
855 defp restrict_recipients(query, recipients, nil) do
856 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
857 end
858
859 defp restrict_recipients(query, recipients, user) do
860 from(
861 activity in query,
862 where: fragment("? && ?", ^recipients, activity.recipients),
863 or_where: activity.actor == ^user.ap_id
864 )
865 end
866
867 defp restrict_local(query, %{local_only: true}) do
868 from(activity in query, where: activity.local == true)
869 end
870
871 defp restrict_local(query, _), do: query
872
873 defp restrict_remote(query, %{remote: true}) do
874 from(activity in query, where: activity.local == false)
875 end
876
877 defp restrict_remote(query, _), do: query
878
879 defp restrict_actor(query, %{actor_id: actor_id}) do
880 from(activity in query, where: activity.actor == ^actor_id)
881 end
882
883 defp restrict_actor(query, _), do: query
884
885 defp restrict_type(query, %{type: type}) when is_binary(type) do
886 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
887 end
888
889 defp restrict_type(query, %{type: type}) do
890 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
891 end
892
893 defp restrict_type(query, _), do: query
894
895 defp restrict_state(query, %{state: state}) do
896 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
897 end
898
899 defp restrict_state(query, _), do: query
900
901 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
902 from(
903 [_activity, object] in query,
904 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
905 )
906 end
907
908 defp restrict_favorited_by(query, _), do: query
909
910 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
911 raise "Can't use the child object without preloading!"
912 end
913
914 defp restrict_media(query, %{only_media: true}) do
915 from(
916 [activity, object] in query,
917 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
918 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
919 )
920 end
921
922 defp restrict_media(query, _), do: query
923
924 defp restrict_replies(query, %{exclude_replies: true}) do
925 from(
926 [_activity, object] in query,
927 where: fragment("?->>'inReplyTo' is null", object.data)
928 )
929 end
930
931 defp restrict_replies(query, %{
932 reply_filtering_user: %User{} = user,
933 reply_visibility: "self"
934 }) do
935 from(
936 [activity, object] in query,
937 where:
938 fragment(
939 "?->>'inReplyTo' is null OR ? = ANY(?)",
940 object.data,
941 ^user.ap_id,
942 activity.recipients
943 )
944 )
945 end
946
947 defp restrict_replies(query, %{
948 reply_filtering_user: %User{} = user,
949 reply_visibility: "following"
950 }) do
951 from(
952 [activity, object] in query,
953 where:
954 fragment(
955 """
956 ?->>'type' != 'Create' -- This isn't a Create
957 OR ?->>'inReplyTo' is null -- this isn't a reply
958 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
959 -- unless they are the author (because authors
960 -- are also part of the recipients). This leads
961 -- to a bug that self-replies by friends won't
962 -- show up.
963 OR ? = ? -- The actor is us
964 """,
965 activity.data,
966 object.data,
967 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
968 activity.recipients,
969 activity.actor,
970 activity.actor,
971 ^user.ap_id
972 )
973 )
974 end
975
976 defp restrict_replies(query, _), do: query
977
978 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
979 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
980 end
981
982 defp restrict_reblogs(query, _), do: query
983
984 defp restrict_muted(query, %{with_muted: true}), do: query
985
986 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
987 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
988
989 query =
990 from([activity] in query,
991 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
992 where:
993 fragment(
994 "not (?->'to' \\?| ?) or ? = ?",
995 activity.data,
996 ^mutes,
997 activity.actor,
998 ^user.ap_id
999 )
1000 )
1001
1002 unless opts[:skip_preload] do
1003 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1004 else
1005 query
1006 end
1007 end
1008
1009 defp restrict_muted(query, _), do: query
1010
1011 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1012 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1013 domain_blocks = user.domain_blocks || []
1014
1015 following_ap_ids = User.get_friends_ap_ids(user)
1016
1017 query =
1018 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1019
1020 from(
1021 [activity, object: o] in query,
1022 # You don't block the author
1023 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1024
1025 # You don't block any recipients, and didn't author the post
1026 where:
1027 fragment(
1028 "((not (? && ?)) or ? = ?)",
1029 activity.recipients,
1030 ^blocked_ap_ids,
1031 activity.actor,
1032 ^user.ap_id
1033 ),
1034
1035 # You don't block the domain of any recipients, and didn't author the post
1036 where:
1037 fragment(
1038 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1039 activity.recipients,
1040 ^domain_blocks,
1041 activity.actor,
1042 ^user.ap_id
1043 ),
1044
1045 # It's not a boost of a user you block
1046 where:
1047 fragment(
1048 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1049 activity.data,
1050 activity.data,
1051 ^blocked_ap_ids
1052 ),
1053
1054 # You don't block the author's domain, and also don't follow the author
1055 where:
1056 fragment(
1057 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1058 activity.actor,
1059 ^domain_blocks,
1060 activity.actor,
1061 ^following_ap_ids
1062 ),
1063
1064 # Same as above, but checks the Object
1065 where:
1066 fragment(
1067 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1068 o.data,
1069 ^domain_blocks,
1070 o.data,
1071 ^following_ap_ids
1072 )
1073 )
1074 end
1075
1076 defp restrict_blocked(query, _), do: query
1077
1078 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1079 if Config.get([:activitypub, :blockers_visible]) == true do
1080 query
1081 else
1082 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1083
1084 from(
1085 activity in query,
1086 # The author doesn't block you
1087 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1088
1089 # It's not a boost of a user that blocks you
1090 where:
1091 fragment(
1092 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1093 activity.data,
1094 activity.data,
1095 ^blocker_ap_ids
1096 )
1097 )
1098 end
1099 end
1100
1101 defp restrict_blockers_visibility(query, _), do: query
1102
1103 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1104 from(
1105 activity in query,
1106 where:
1107 fragment(
1108 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1109 activity.data,
1110 ^[Constants.as_public()]
1111 )
1112 )
1113 end
1114
1115 defp restrict_unlisted(query, _), do: query
1116
1117 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1118 from(
1119 [activity, object: o] in query,
1120 where:
1121 fragment(
1122 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1123 activity.data,
1124 activity.data,
1125 activity.data,
1126 ^ids
1127 )
1128 )
1129 end
1130
1131 defp restrict_pinned(query, _), do: query
1132
1133 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1134 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1135
1136 from(
1137 activity in query,
1138 where:
1139 fragment(
1140 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1141 activity.data,
1142 activity.actor,
1143 ^muted_reblogs
1144 )
1145 )
1146 end
1147
1148 defp restrict_muted_reblogs(query, _), do: query
1149
1150 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1151 from(
1152 activity in query,
1153 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1154 )
1155 end
1156
1157 defp restrict_instance(query, _), do: query
1158
1159 defp restrict_filtered(query, %{user: %User{} = user}) do
1160 case Filter.compose_regex(user) do
1161 nil ->
1162 query
1163
1164 regex ->
1165 from([activity, object] in query,
1166 where:
1167 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1168 activity.actor == ^user.ap_id
1169 )
1170 end
1171 end
1172
1173 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1174 restrict_filtered(query, %{user: user})
1175 end
1176
1177 defp restrict_filtered(query, _), do: query
1178
1179 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1180
1181 defp exclude_poll_votes(query, _) do
1182 if has_named_binding?(query, :object) do
1183 from([activity, object: o] in query,
1184 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1185 )
1186 else
1187 query
1188 end
1189 end
1190
1191 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1192
1193 defp exclude_invisible_actors(query, _opts) do
1194 invisible_ap_ids =
1195 User.Query.build(%{invisible: true, select: [:ap_id]})
1196 |> Repo.all()
1197 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1198
1199 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1200 end
1201
1202 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1203 from(activity in query, where: activity.id != ^id)
1204 end
1205
1206 defp exclude_id(query, _), do: query
1207
1208 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1209
1210 defp maybe_preload_objects(query, _) do
1211 query
1212 |> Activity.with_preloaded_object()
1213 end
1214
1215 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1216
1217 defp maybe_preload_bookmarks(query, opts) do
1218 query
1219 |> Activity.with_preloaded_bookmark(opts[:user])
1220 end
1221
1222 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1223 query
1224 |> Activity.with_preloaded_report_notes()
1225 end
1226
1227 defp maybe_preload_report_notes(query, _), do: query
1228
1229 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1230
1231 defp maybe_set_thread_muted_field(query, opts) do
1232 query
1233 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1234 end
1235
1236 defp maybe_order(query, %{order: :desc}) do
1237 query
1238 |> order_by(desc: :id)
1239 end
1240
1241 defp maybe_order(query, %{order: :asc}) do
1242 query
1243 |> order_by(asc: :id)
1244 end
1245
1246 defp maybe_order(query, _), do: query
1247
1248 defp normalize_fetch_activities_query_opts(opts) do
1249 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1250 case opts[key] do
1251 value when is_bitstring(value) ->
1252 Map.put(opts, key, Hashtag.normalize_name(value))
1253
1254 value when is_list(value) ->
1255 normalized_value =
1256 value
1257 |> Enum.map(&Hashtag.normalize_name/1)
1258 |> Enum.uniq()
1259
1260 Map.put(opts, key, normalized_value)
1261
1262 _ ->
1263 opts
1264 end
1265 end)
1266 end
1267
1268 defp fetch_activities_query_ap_ids_ops(opts) do
1269 source_user = opts[:muting_user]
1270 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1271
1272 ap_id_relationships =
1273 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1274 [:block | ap_id_relationships]
1275 else
1276 ap_id_relationships
1277 end
1278
1279 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1280
1281 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1282 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1283
1284 restrict_muted_reblogs_opts =
1285 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1286
1287 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1288 end
1289
1290 def fetch_activities_query(recipients, opts \\ %{}) do
1291 opts = normalize_fetch_activities_query_opts(opts)
1292
1293 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1294 fetch_activities_query_ap_ids_ops(opts)
1295
1296 config = %{
1297 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1298 }
1299
1300 query =
1301 Activity
1302 |> maybe_preload_objects(opts)
1303 |> maybe_preload_bookmarks(opts)
1304 |> maybe_preload_report_notes(opts)
1305 |> maybe_set_thread_muted_field(opts)
1306 |> maybe_order(opts)
1307 |> restrict_recipients(recipients, opts[:user])
1308 |> restrict_replies(opts)
1309 |> restrict_since(opts)
1310 |> restrict_local(opts)
1311 |> restrict_remote(opts)
1312 |> restrict_actor(opts)
1313 |> restrict_type(opts)
1314 |> restrict_state(opts)
1315 |> restrict_favorited_by(opts)
1316 |> restrict_blocked(restrict_blocked_opts)
1317 |> restrict_blockers_visibility(opts)
1318 |> restrict_muted(restrict_muted_opts)
1319 |> restrict_filtered(opts)
1320 |> restrict_media(opts)
1321 |> restrict_visibility(opts)
1322 |> restrict_thread_visibility(opts, config)
1323 |> restrict_reblogs(opts)
1324 |> restrict_pinned(opts)
1325 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1326 |> restrict_instance(opts)
1327 |> restrict_announce_object_actor(opts)
1328 |> restrict_filtered(opts)
1329 |> Activity.restrict_deactivated_users()
1330 |> exclude_poll_votes(opts)
1331 |> exclude_invisible_actors(opts)
1332 |> exclude_visibility(opts)
1333
1334 if Config.feature_enabled?(:improved_hashtag_timeline) do
1335 query
1336 |> restrict_hashtag_any(opts)
1337 |> restrict_hashtag_all(opts)
1338 |> restrict_hashtag_reject_any(opts)
1339 else
1340 query
1341 |> restrict_embedded_tag_any(opts)
1342 |> restrict_embedded_tag_all(opts)
1343 |> restrict_embedded_tag_reject_any(opts)
1344 end
1345 end
1346
1347 @doc """
1348 Fetch favorites activities of user with order by sort adds to favorites
1349 """
1350 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1351 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1352 user.ap_id
1353 |> Activity.Queries.by_actor()
1354 |> Activity.Queries.by_type("Like")
1355 |> Activity.with_joined_object()
1356 |> Object.with_joined_activity()
1357 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1358 |> order_by([like, _, _], desc_nulls_last: like.id)
1359 |> Pagination.fetch_paginated(
1360 Map.merge(params, %{skip_order: true}),
1361 pagination
1362 )
1363 end
1364
1365 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1366 Enum.map(activities, fn
1367 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1368 if Enum.any?(bcc, &(&1 in list_memberships)) do
1369 update_in(activity.data["cc"], &[user_ap_id | &1])
1370 else
1371 activity
1372 end
1373
1374 activity ->
1375 activity
1376 end)
1377 end
1378
1379 defp maybe_update_cc(activities, _, _), do: activities
1380
1381 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1382 from(activity in query,
1383 where:
1384 fragment("? && ?", activity.recipients, ^recipients) or
1385 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1386 ^Constants.as_public() in activity.recipients)
1387 )
1388 end
1389
1390 def fetch_activities_bounded(
1391 recipients,
1392 recipients_with_public,
1393 opts \\ %{},
1394 pagination \\ :keyset
1395 ) do
1396 fetch_activities_query([], opts)
1397 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1398 |> Pagination.fetch_paginated(opts, pagination)
1399 |> Enum.reverse()
1400 end
1401
1402 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1403 def upload(file, opts \\ []) do
1404 with {:ok, data} <- Upload.store(file, opts) do
1405 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1406
1407 Repo.insert(%Object{data: obj_data})
1408 end
1409 end
1410
1411 @spec get_actor_url(any()) :: binary() | nil
1412 defp get_actor_url(url) when is_binary(url), do: url
1413 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1414
1415 defp get_actor_url(url) when is_list(url) do
1416 url
1417 |> List.first()
1418 |> get_actor_url()
1419 end
1420
1421 defp get_actor_url(_url), do: nil
1422
1423 defp normalize_image(%{"url" => url}) do
1424 %{
1425 "type" => "Image",
1426 "url" => [%{"href" => url}]
1427 }
1428 end
1429
1430 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1431 defp normalize_image(_), do: nil
1432
1433 defp object_to_user_data(data) do
1434 fields =
1435 data
1436 |> Map.get("attachment", [])
1437 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1438 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1439
1440 emojis =
1441 data
1442 |> Map.get("tag", [])
1443 |> Enum.filter(fn
1444 %{"type" => "Emoji"} -> true
1445 _ -> false
1446 end)
1447 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1448 {String.trim(name, ":"), url}
1449 end)
1450
1451 is_locked = data["manuallyApprovesFollowers"] || false
1452 data = Transmogrifier.maybe_fix_user_object(data)
1453 is_discoverable = data["discoverable"] || false
1454 invisible = data["invisible"] || false
1455 actor_type = data["type"] || "Person"
1456
1457 featured_address = data["featured"]
1458 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1459
1460 public_key =
1461 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1462 data["publicKey"]["publicKeyPem"]
1463 else
1464 nil
1465 end
1466
1467 shared_inbox =
1468 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1469 data["endpoints"]["sharedInbox"]
1470 else
1471 nil
1472 end
1473
1474 user_data = %{
1475 ap_id: data["id"],
1476 uri: get_actor_url(data["url"]),
1477 ap_enabled: true,
1478 banner: normalize_image(data["image"]),
1479 fields: fields,
1480 emoji: emojis,
1481 is_locked: is_locked,
1482 is_discoverable: is_discoverable,
1483 invisible: invisible,
1484 avatar: normalize_image(data["icon"]),
1485 name: data["name"],
1486 follower_address: data["followers"],
1487 following_address: data["following"],
1488 featured_address: featured_address,
1489 bio: data["summary"] || "",
1490 actor_type: actor_type,
1491 also_known_as: Map.get(data, "alsoKnownAs", []),
1492 public_key: public_key,
1493 inbox: data["inbox"],
1494 shared_inbox: shared_inbox,
1495 pinned_objects: pinned_objects
1496 }
1497
1498 # nickname can be nil because of virtual actors
1499 if data["preferredUsername"] do
1500 Map.put(
1501 user_data,
1502 :nickname,
1503 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1504 )
1505 else
1506 Map.put(user_data, :nickname, nil)
1507 end
1508 end
1509
1510 def fetch_follow_information_for_user(user) do
1511 with {:ok, following_data} <-
1512 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1513 {:ok, hide_follows} <- collection_private(following_data),
1514 {:ok, followers_data} <-
1515 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1516 {:ok, hide_followers} <- collection_private(followers_data) do
1517 {:ok,
1518 %{
1519 hide_follows: hide_follows,
1520 follower_count: normalize_counter(followers_data["totalItems"]),
1521 following_count: normalize_counter(following_data["totalItems"]),
1522 hide_followers: hide_followers
1523 }}
1524 else
1525 {:error, _} = e -> e
1526 e -> {:error, e}
1527 end
1528 end
1529
1530 defp normalize_counter(counter) when is_integer(counter), do: counter
1531 defp normalize_counter(_), do: 0
1532
1533 def maybe_update_follow_information(user_data) do
1534 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1535 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1536 {_, true} <-
1537 {:collections_available,
1538 !!(user_data[:following_address] && user_data[:follower_address])},
1539 {:ok, info} <-
1540 fetch_follow_information_for_user(user_data) do
1541 info = Map.merge(user_data[:info] || %{}, info)
1542
1543 user_data
1544 |> Map.put(:info, info)
1545 else
1546 {:user_type_check, false} ->
1547 user_data
1548
1549 {:collections_available, false} ->
1550 user_data
1551
1552 {:enabled, false} ->
1553 user_data
1554
1555 e ->
1556 Logger.error(
1557 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1558 )
1559
1560 user_data
1561 end
1562 end
1563
1564 defp collection_private(%{"first" => %{"type" => type}})
1565 when type in ["CollectionPage", "OrderedCollectionPage"],
1566 do: {:ok, false}
1567
1568 defp collection_private(%{"first" => first}) do
1569 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1570 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1571 {:ok, false}
1572 else
1573 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1574 {:error, _} = e -> e
1575 e -> {:error, e}
1576 end
1577 end
1578
1579 defp collection_private(_data), do: {:ok, true}
1580
1581 def user_data_from_user_object(data) do
1582 with {:ok, data} <- MRF.filter(data) do
1583 {:ok, object_to_user_data(data)}
1584 else
1585 e -> {:error, e}
1586 end
1587 end
1588
1589 def fetch_and_prepare_user_from_ap_id(ap_id) do
1590 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1591 {:ok, data} <- user_data_from_user_object(data) do
1592 {:ok, maybe_update_follow_information(data)}
1593 else
1594 # If this has been deleted, only log a debug and not an error
1595 {:error, "Object has been deleted" = e} ->
1596 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1597 {:error, e}
1598
1599 {:error, {:reject, reason} = e} ->
1600 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1601 {:error, e}
1602
1603 {:error, e} ->
1604 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1605 {:error, e}
1606 end
1607 end
1608
1609 def maybe_handle_clashing_nickname(data) do
1610 with nickname when is_binary(nickname) <- data[:nickname],
1611 %User{} = old_user <- User.get_by_nickname(nickname),
1612 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1613 Logger.info(
1614 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1615 )
1616
1617 old_user
1618 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1619 |> User.update_and_set_cache()
1620 else
1621 {:ap_id_comparison, true} ->
1622 Logger.info(
1623 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1624 )
1625
1626 _ ->
1627 nil
1628 end
1629 end
1630
1631 def pin_data_from_featured_collection(%{
1632 "type" => "OrderedCollection",
1633 "first" => first
1634 }) do
1635 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1636 page
1637 |> Map.get("orderedItems")
1638 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1639 else
1640 e ->
1641 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1642 {:ok, %{}}
1643 end
1644 end
1645
1646 def pin_data_from_featured_collection(
1647 %{
1648 "type" => type
1649 } = collection
1650 )
1651 when type in ["OrderedCollection", "Collection"] do
1652 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1653 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1654 end
1655
1656 def fetch_and_prepare_featured_from_ap_id(nil) do
1657 {:ok, %{}}
1658 end
1659
1660 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1661 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1662 {:ok, pin_data_from_featured_collection(data)}
1663 else
1664 e ->
1665 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1666 {:ok, %{}}
1667 end
1668 end
1669
1670 def pinned_fetch_task(nil), do: nil
1671
1672 def pinned_fetch_task(%{pinned_objects: pins}) do
1673 if Enum.all?(pins, fn {ap_id, _} ->
1674 Object.get_cached_by_ap_id(ap_id) ||
1675 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1676 end) do
1677 :ok
1678 else
1679 :error
1680 end
1681 end
1682
1683 def make_user_from_ap_id(ap_id) do
1684 user = User.get_cached_by_ap_id(ap_id)
1685
1686 if user && !User.ap_enabled?(user) do
1687 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1688 else
1689 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1690 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1691
1692 if user do
1693 user
1694 |> User.remote_user_changeset(data)
1695 |> User.update_and_set_cache()
1696 else
1697 maybe_handle_clashing_nickname(data)
1698
1699 data
1700 |> User.remote_user_changeset()
1701 |> Repo.insert()
1702 |> User.set_cache()
1703 end
1704 end
1705 end
1706 end
1707
1708 def make_user_from_nickname(nickname) do
1709 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1710 make_user_from_ap_id(ap_id)
1711 else
1712 _e -> {:error, "No AP id in WebFinger"}
1713 end
1714 end
1715
1716 # filter out broken threads
1717 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1718 entire_thread_visible_for_user?(activity, user)
1719 end
1720
1721 # do post-processing on a specific activity
1722 def contain_activity(%Activity{} = activity, %User{} = user) do
1723 contain_broken_threads(activity, user)
1724 end
1725
1726 def fetch_direct_messages_query do
1727 Activity
1728 |> restrict_type(%{type: "Create"})
1729 |> restrict_visibility(%{visibility: "direct"})
1730 |> order_by([activity], asc: activity.id)
1731 end
1732 end