Merge pull request 'purge scrobbling' (#90) from purge/scrobbling into develop
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Akkoma.Collections
7 alias Pleroma.Activity
8 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Hashtag
15 alias Pleroma.Maps
16 alias Pleroma.Notification
17 alias Pleroma.Object
18 alias Pleroma.Object.Containment
19 alias Pleroma.Object.Fetcher
20 alias Pleroma.Pagination
21 alias Pleroma.Repo
22 alias Pleroma.Upload
23 alias Pleroma.User
24 alias Pleroma.Web.ActivityPub.MRF
25 alias Pleroma.Web.ActivityPub.Transmogrifier
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Web.WebFinger
28 alias Pleroma.Workers.BackgroundWorker
29 alias Pleroma.Workers.PollWorker
30
31 import Ecto.Query
32 import Pleroma.Web.ActivityPub.Utils
33 import Pleroma.Web.ActivityPub.Visibility
34
35 require Logger
36 require Pleroma.Constants
37
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
39 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40
41 defp get_recipients(%{"type" => "Create"} = data) do
42 to = Map.get(data, "to", [])
43 cc = Map.get(data, "cc", [])
44 bcc = Map.get(data, "bcc", [])
45 actor = Map.get(data, "actor", [])
46 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(data) do
51 to = Map.get(data, "to", [])
52 cc = Map.get(data, "cc", [])
53 bcc = Map.get(data, "bcc", [])
54 recipients = Enum.concat([to, cc, bcc])
55 {recipients, to, cc}
56 end
57
58 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
59 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60
61 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
62 case User.get_cached_by_ap_id(actor) do
63 %User{is_active: true} -> true
64 _ -> false
65 end
66 end
67
68 defp check_actor_can_insert(_), do: true
69
70 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
71 limit = Config.get([:instance, :remote_limit])
72 String.length(content) <= limit
73 end
74
75 defp check_remote_limit(_), do: true
76
77 def increase_note_count_if_public(actor, object) do
78 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
79 end
80
81 def decrease_note_count_if_public(actor, object) do
82 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
83 end
84
85 def update_last_status_at_if_public(actor, object) do
86 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
87 end
88
89 defp increase_replies_count_if_reply(%{
90 "object" => %{"inReplyTo" => reply_ap_id} = object,
91 "type" => "Create"
92 }) do
93 if is_public?(object) do
94 Object.increase_replies_count(reply_ap_id)
95 end
96 end
97
98 defp increase_replies_count_if_reply(_create_data), do: :noop
99
100 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
101 @impl true
102 def persist(%{"type" => type} = object, meta) when type in @object_types do
103 with {:ok, object} <- Object.create(object) do
104 {:ok, object, meta}
105 end
106 end
107
108 @impl true
109 def persist(object, meta) do
110 with local <- Keyword.fetch!(meta, :local),
111 {recipients, _, _} <- get_recipients(object),
112 {:ok, activity} <-
113 Repo.insert(%Activity{
114 data: object,
115 local: local,
116 recipients: recipients,
117 actor: object["actor"]
118 }),
119 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
120 {:ok, _} <- maybe_create_activity_expiration(activity) do
121 {:ok, activity, meta}
122 end
123 end
124
125 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
130 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
135 {:ok, map, object} <- insert_full_object(map),
136 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
137 # Splice in the child object if we have one.
138 activity = Maps.put_if_present(activity, :object, object)
139
140 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
141 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
142 end)
143
144 # Add local posts to search index
145 if local, do: Pleroma.Search.add_to_index(activity)
146
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:actor_check, _} ->
153 {:error, false}
154
155 {:containment, _} = error ->
156 error
157
158 {:error, _} = error ->
159 error
160
161 {:fake, true, map, recipients} ->
162 activity = %Activity{
163 data: map,
164 local: local,
165 actor: map["actor"],
166 recipients: recipients,
167 id: "pleroma:fakeid"
168 }
169
170 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
171 {:ok, activity}
172
173 {:remote_limit_pass, _} ->
174 {:error, :remote_limit}
175
176 {:reject, _} = e ->
177 {:error, e}
178 end
179 end
180
181 defp insert_activity_with_expiration(data, local, recipients) do
182 struct = %Activity{
183 data: data,
184 local: local,
185 actor: data["actor"],
186 recipients: recipients
187 }
188
189 with {:ok, activity} <- Repo.insert(struct) do
190 maybe_create_activity_expiration(activity)
191 end
192 end
193
194 def notify_and_stream(activity) do
195 Notification.create_notifications(activity)
196
197 conversation = create_or_bump_conversation(activity, activity.actor)
198 participations = get_participations(conversation)
199 stream_out(activity)
200 stream_out_participations(participations)
201 end
202
203 defp maybe_create_activity_expiration(
204 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
205 ) do
206 with {:ok, _job} <-
207 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
208 activity_id: activity.id,
209 expires_at: expires_at
210 }) do
211 {:ok, activity}
212 end
213 end
214
215 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
216
217 defp create_or_bump_conversation(activity, actor) do
218 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
219 %User{} = user <- User.get_cached_by_ap_id(actor) do
220 Participation.mark_as_read(user, conversation)
221 {:ok, conversation}
222 end
223 end
224
225 defp get_participations({:ok, conversation}) do
226 conversation
227 |> Repo.preload(:participations, force: true)
228 |> Map.get(:participations)
229 end
230
231 defp get_participations(_), do: []
232
233 def stream_out_participations(participations) do
234 participations =
235 participations
236 |> Repo.preload(:user)
237
238 Streamer.stream("participation", participations)
239 end
240
241 @impl true
242 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
243 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
244 conversation = Repo.preload(conversation, :participations)
245
246 last_activity_id =
247 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
248 user: user,
249 blocking_user: user
250 })
251
252 if last_activity_id do
253 stream_out_participations(conversation.participations)
254 end
255 end
256 end
257
258 @impl true
259 def stream_out_participations(_, _), do: :noop
260
261 @impl true
262 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
263 when data_type in ["Create", "Announce", "Delete"] do
264 activity
265 |> Topics.get_activity_topics()
266 |> Streamer.stream(activity)
267 end
268
269 @impl true
270 def stream_out(_activity) do
271 :noop
272 end
273
274 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
275 def create(params, fake \\ false) do
276 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
277 result
278 end
279 end
280
281 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
282 additional = params[:additional] || %{}
283 # only accept false as false value
284 local = !(params[:local] == false)
285 published = params[:published]
286 quick_insert? = Config.get([:env]) == :benchmark
287
288 create_data =
289 make_create_data(
290 %{to: to, actor: actor, published: published, context: context, object: object},
291 additional
292 )
293
294 with {:ok, activity} <- insert(create_data, local, fake),
295 {:fake, false, activity} <- {:fake, fake, activity},
296 _ <- increase_replies_count_if_reply(create_data),
297 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
298 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
299 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
300 _ <- notify_and_stream(activity),
301 :ok <- maybe_schedule_poll_notifications(activity),
302 :ok <- maybe_federate(activity) do
303 {:ok, activity}
304 else
305 {:quick_insert, true, activity} ->
306 {:ok, activity}
307
308 {:fake, true, activity} ->
309 {:ok, activity}
310
311 {:error, message} ->
312 Repo.rollback(message)
313 end
314 end
315
316 defp maybe_schedule_poll_notifications(activity) do
317 PollWorker.schedule_poll_end(activity)
318 :ok
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
383 superuser
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
386 end)
387
388 {:ok, activity}
389 else
390 {:error, error} -> Repo.rollback(error)
391 end
392 end
393
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
396 params = %{
397 "type" => "Move",
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id,
401 "to" => [origin.follower_address]
402 }
403
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
408
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
412 })
413
414 {:ok, activity}
415 else
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
417 err -> err
418 end
419 end
420
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
423
424 recipients =
425 if opts[:user],
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 else: public
428
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_blockers_visibility(opts)
435 |> restrict_recipients(recipients, opts[:user])
436 |> restrict_filtered(opts)
437 |> where(
438 [activity],
439 fragment(
440 "?->>'type' = ? and ?->>'context' = ?",
441 activity.data,
442 "Create",
443 activity.data,
444 ^context
445 )
446 )
447 |> exclude_poll_votes(opts)
448 |> exclude_id(opts)
449 |> order_by([activity], desc: activity.id)
450 end
451
452 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
453 def fetch_activities_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(opts)
456 |> Repo.all()
457 end
458
459 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
460 FlakeId.Ecto.CompatType.t() | nil
461 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
462 context
463 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
464 |> restrict_visibility(%{visibility: "direct"})
465 |> limit(1)
466 |> select([a], a.id)
467 |> Repo.one()
468 end
469
470 defp fetch_paginated_optimized(query, opts, pagination) do
471 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
472 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
473 opts = Map.put(opts, :skip_extra_order, true)
474
475 Pagination.fetch_paginated(query, opts, pagination)
476 end
477
478 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
479 list_memberships = Pleroma.List.memberships(opts[:user])
480
481 fetch_activities_query(recipients ++ list_memberships, opts)
482 |> fetch_paginated_optimized(opts, pagination)
483 |> Enum.reverse()
484 |> maybe_update_cc(list_memberships, opts[:user])
485 end
486
487 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
488 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
489 opts = Map.delete(opts, :user)
490
491 [Constants.as_public()]
492 |> fetch_activities_query(opts)
493 |> restrict_unlisted(opts)
494 |> fetch_paginated_optimized(opts, pagination)
495 end
496
497 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
498 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
499 opts
500 |> Map.put(:restrict_unlisted, true)
501 |> fetch_public_or_unlisted_activities(pagination)
502 end
503
504 @valid_visibilities ~w[direct unlisted public private]
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when is_list(visibility) do
508 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
509 from(
510 a in query,
511 where:
512 fragment(
513 "activity_visibility(?, ?, ?) = ANY (?)",
514 a.actor,
515 a.recipients,
516 a.data,
517 ^visibility
518 )
519 )
520 else
521 Logger.error("Could not restrict visibility to #{visibility}")
522 end
523 end
524
525 defp restrict_visibility(query, %{visibility: visibility})
526 when visibility in @valid_visibilities do
527 from(
528 a in query,
529 where:
530 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
531 )
532 end
533
534 defp restrict_visibility(_query, %{visibility: visibility})
535 when visibility not in @valid_visibilities do
536 Logger.error("Could not restrict visibility to #{visibility}")
537 end
538
539 defp restrict_visibility(query, _visibility), do: query
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when is_list(visibility) do
543 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ANY (?)",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 else
556 Logger.error("Could not exclude visibility to #{visibility}")
557 query
558 end
559 end
560
561 defp exclude_visibility(query, %{exclude_visibilities: visibility})
562 when visibility in @valid_visibilities do
563 from(
564 a in query,
565 where:
566 not fragment(
567 "activity_visibility(?, ?, ?) = ?",
568 a.actor,
569 a.recipients,
570 a.data,
571 ^visibility
572 )
573 )
574 end
575
576 defp exclude_visibility(query, %{exclude_visibilities: visibility})
577 when visibility not in [nil | @valid_visibilities] do
578 Logger.error("Could not exclude visibility to #{visibility}")
579 query
580 end
581
582 defp exclude_visibility(query, _visibility), do: query
583
584 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
585 do: query
586
587 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
588 do: query
589
590 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
591 from(
592 a in query,
593 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
594 )
595 end
596
597 defp restrict_thread_visibility(query, _, _), do: query
598
599 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
600 params =
601 params
602 |> Map.put(:user, reading_user)
603 |> Map.put(:actor_id, user.ap_id)
604
605 %{
606 godmode: params[:godmode],
607 reading_user: reading_user
608 }
609 |> user_activities_recipients()
610 |> fetch_activities(params)
611 |> Enum.reverse()
612 end
613
614 def fetch_user_activities(user, reading_user, params \\ %{})
615
616 def fetch_user_activities(user, reading_user, %{total: true} = params) do
617 result = fetch_activities_for_user(user, reading_user, params)
618
619 Keyword.put(result, :items, Enum.reverse(result[:items]))
620 end
621
622 def fetch_user_activities(user, reading_user, params) do
623 user
624 |> fetch_activities_for_user(reading_user, params)
625 |> Enum.reverse()
626 end
627
628 defp fetch_activities_for_user(user, reading_user, params) do
629 params =
630 params
631 |> Map.put(:type, ["Create", "Announce"])
632 |> Map.put(:user, reading_user)
633 |> Map.put(:actor_id, user.ap_id)
634 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
635
636 params =
637 if User.blocks?(reading_user, user) do
638 params
639 else
640 params
641 |> Map.put(:blocking_user, reading_user)
642 |> Map.put(:muting_user, reading_user)
643 end
644
645 pagination_type = Map.get(params, :pagination_type) || :keyset
646
647 %{
648 godmode: params[:godmode],
649 reading_user: reading_user
650 }
651 |> user_activities_recipients()
652 |> fetch_activities(params, pagination_type)
653 end
654
655 def fetch_statuses(reading_user, %{total: true} = params) do
656 result = fetch_activities_for_reading_user(reading_user, params)
657 Keyword.put(result, :items, Enum.reverse(result[:items]))
658 end
659
660 def fetch_statuses(reading_user, params) do
661 reading_user
662 |> fetch_activities_for_reading_user(params)
663 |> Enum.reverse()
664 end
665
666 defp fetch_activities_for_reading_user(reading_user, params) do
667 params = Map.put(params, :type, ["Create", "Announce"])
668
669 %{
670 godmode: params[:godmode],
671 reading_user: reading_user
672 }
673 |> user_activities_recipients()
674 |> fetch_activities(params, :offset)
675 end
676
677 defp user_activities_recipients(%{godmode: true}), do: []
678
679 defp user_activities_recipients(%{reading_user: reading_user}) do
680 if reading_user do
681 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
682 else
683 [Constants.as_public()]
684 end
685 end
686
687 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
688 raise "Can't use the child object without preloading!"
689 end
690
691 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
692 from(
693 [activity, object] in query,
694 where:
695 fragment(
696 "?->>'type' != ? or ?->>'actor' != ?",
697 activity.data,
698 "Announce",
699 object.data,
700 ^actor
701 )
702 )
703 end
704
705 defp restrict_announce_object_actor(query, _), do: query
706
707 defp restrict_since(query, %{since_id: ""}), do: query
708
709 defp restrict_since(query, %{since_id: since_id}) do
710 from(activity in query, where: activity.id > ^since_id)
711 end
712
713 defp restrict_since(query, _), do: query
714
715 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
716 raise_on_missing_preload()
717 end
718
719 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
720 from(
721 [_activity, object] in query,
722 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
723 )
724 end
725
726 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
727 restrict_embedded_tag_any(query, %{tag: tag})
728 end
729
730 defp restrict_embedded_tag_all(query, _), do: query
731
732 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
733 raise_on_missing_preload()
734 end
735
736 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
737 from(
738 [_activity, object] in query,
739 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
740 )
741 end
742
743 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
744 restrict_embedded_tag_any(query, %{tag: [tag]})
745 end
746
747 defp restrict_embedded_tag_any(query, _), do: query
748
749 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
750 raise_on_missing_preload()
751 end
752
753 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
754 from(
755 [_activity, object] in query,
756 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
757 )
758 end
759
760 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
761 when is_binary(tag_reject) do
762 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
763 end
764
765 defp restrict_embedded_tag_reject_any(query, _), do: query
766
767 defp object_ids_query_for_tags(tags) do
768 from(hto in "hashtags_objects")
769 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
770 |> where([hto, ht], ht.name in ^tags)
771 |> select([hto], hto.object_id)
772 |> distinct([hto], true)
773 end
774
775 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
777 end
778
779 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
780 restrict_hashtag_any(query, %{tag: single_tag})
781 end
782
783 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
784 from(
785 [_activity, object] in query,
786 where:
787 fragment(
788 """
789 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
790 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
791 AND hashtags_objects.object_id = ?) @> ?
792 """,
793 ^tags,
794 object.id,
795 ^tags
796 )
797 )
798 end
799
800 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
801 restrict_hashtag_all(query, %{tag_all: [tag]})
802 end
803
804 defp restrict_hashtag_all(query, _), do: query
805
806 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
807 raise_on_missing_preload()
808 end
809
810 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
811 hashtag_ids =
812 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
813 |> Repo.all()
814
815 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
816 from(
817 [_activity, object] in query,
818 join: hto in "hashtags_objects",
819 on: hto.object_id == object.id,
820 where: hto.hashtag_id in ^hashtag_ids,
821 distinct: [desc: object.id],
822 order_by: [desc: object.id]
823 )
824 end
825
826 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
827 restrict_hashtag_any(query, %{tag: [tag]})
828 end
829
830 defp restrict_hashtag_any(query, _), do: query
831
832 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
833 raise_on_missing_preload()
834 end
835
836 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
837 from(
838 [_activity, object] in query,
839 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
840 )
841 end
842
843 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
844 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
845 end
846
847 defp restrict_hashtag_reject_any(query, _), do: query
848
849 defp raise_on_missing_preload do
850 raise "Can't use the child object without preloading!"
851 end
852
853 defp restrict_recipients(query, [], _user), do: query
854
855 defp restrict_recipients(query, recipients, nil) do
856 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
857 end
858
859 defp restrict_recipients(query, recipients, user) do
860 from(
861 activity in query,
862 where: fragment("? && ?", ^recipients, activity.recipients),
863 or_where: activity.actor == ^user.ap_id
864 )
865 end
866
867 defp restrict_local(query, %{local_only: true}) do
868 from(activity in query, where: activity.local == true)
869 end
870
871 defp restrict_local(query, _), do: query
872
873 defp restrict_remote(query, %{remote: true}) do
874 from(activity in query, where: activity.local == false)
875 end
876
877 defp restrict_remote(query, _), do: query
878
879 defp restrict_actor(query, %{actor_id: actor_id}) do
880 from(activity in query, where: activity.actor == ^actor_id)
881 end
882
883 defp restrict_actor(query, _), do: query
884
885 defp restrict_type(query, %{type: type}) when is_binary(type) do
886 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
887 end
888
889 defp restrict_type(query, %{type: type}) do
890 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
891 end
892
893 defp restrict_type(query, _), do: query
894
895 defp restrict_state(query, %{state: state}) do
896 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
897 end
898
899 defp restrict_state(query, _), do: query
900
901 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
902 from(
903 [_activity, object] in query,
904 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
905 )
906 end
907
908 defp restrict_favorited_by(query, _), do: query
909
910 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
911 raise "Can't use the child object without preloading!"
912 end
913
914 defp restrict_media(query, %{only_media: true}) do
915 from(
916 [activity, object] in query,
917 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
918 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
919 )
920 end
921
922 defp restrict_media(query, _), do: query
923
924 defp restrict_replies(query, %{exclude_replies: true}) do
925 from(
926 [_activity, object] in query,
927 where: fragment("?->>'inReplyTo' is null", object.data)
928 )
929 end
930
931 defp restrict_replies(query, %{
932 reply_filtering_user: %User{} = user,
933 reply_visibility: "self"
934 }) do
935 from(
936 [activity, object] in query,
937 where:
938 fragment(
939 "?->>'inReplyTo' is null OR ? = ANY(?)",
940 object.data,
941 ^user.ap_id,
942 activity.recipients
943 )
944 )
945 end
946
947 defp restrict_replies(query, %{
948 reply_filtering_user: %User{} = user,
949 reply_visibility: "following"
950 }) do
951 from(
952 [activity, object] in query,
953 where:
954 fragment(
955 """
956 ?->>'type' != 'Create' -- This isn't a Create
957 OR ?->>'inReplyTo' is null -- this isn't a reply
958 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
959 -- unless they are the author (because authors
960 -- are also part of the recipients). This leads
961 -- to a bug that self-replies by friends won't
962 -- show up.
963 OR ? = ? -- The actor is us
964 """,
965 activity.data,
966 object.data,
967 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
968 activity.recipients,
969 activity.actor,
970 activity.actor,
971 ^user.ap_id
972 )
973 )
974 end
975
976 defp restrict_replies(query, _), do: query
977
978 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
979 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
980 end
981
982 defp restrict_reblogs(query, _), do: query
983
984 defp restrict_muted(query, %{with_muted: true}), do: query
985
986 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
987 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
988
989 query =
990 from([activity] in query,
991 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
992 where:
993 fragment(
994 "not (?->'to' \\?| ?) or ? = ?",
995 activity.data,
996 ^mutes,
997 activity.actor,
998 ^user.ap_id
999 )
1000 )
1001
1002 unless opts[:skip_preload] do
1003 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1004 else
1005 query
1006 end
1007 end
1008
1009 defp restrict_muted(query, _), do: query
1010
1011 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1012 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1013 domain_blocks = user.domain_blocks || []
1014
1015 following_ap_ids = User.get_friends_ap_ids(user)
1016
1017 query =
1018 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1019
1020 from(
1021 [activity, object: o] in query,
1022 # You don't block the author
1023 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1024
1025 # You don't block any recipients, and didn't author the post
1026 where:
1027 fragment(
1028 "((not (? && ?)) or ? = ?)",
1029 activity.recipients,
1030 ^blocked_ap_ids,
1031 activity.actor,
1032 ^user.ap_id
1033 ),
1034
1035 # You don't block the domain of any recipients, and didn't author the post
1036 where:
1037 fragment(
1038 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1039 activity.recipients,
1040 ^domain_blocks,
1041 activity.actor,
1042 ^user.ap_id
1043 ),
1044
1045 # It's not a boost of a user you block
1046 where:
1047 fragment(
1048 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1049 activity.data,
1050 activity.data,
1051 ^blocked_ap_ids
1052 ),
1053
1054 # You don't block the author's domain, and also don't follow the author
1055 where:
1056 fragment(
1057 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1058 activity.actor,
1059 ^domain_blocks,
1060 activity.actor,
1061 ^following_ap_ids
1062 ),
1063
1064 # Same as above, but checks the Object
1065 where:
1066 fragment(
1067 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1068 o.data,
1069 ^domain_blocks,
1070 o.data,
1071 ^following_ap_ids
1072 )
1073 )
1074 end
1075
1076 defp restrict_blocked(query, _), do: query
1077
1078 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1079 if Config.get([:activitypub, :blockers_visible]) == true do
1080 query
1081 else
1082 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1083
1084 from(
1085 activity in query,
1086 # The author doesn't block you
1087 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1088
1089 # It's not a boost of a user that blocks you
1090 where:
1091 fragment(
1092 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1093 activity.data,
1094 activity.data,
1095 ^blocker_ap_ids
1096 )
1097 )
1098 end
1099 end
1100
1101 defp restrict_blockers_visibility(query, _), do: query
1102
1103 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1104 from(
1105 activity in query,
1106 where:
1107 fragment(
1108 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1109 activity.data,
1110 ^[Constants.as_public()]
1111 )
1112 )
1113 end
1114
1115 defp restrict_unlisted(query, _), do: query
1116
1117 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1118 from(
1119 [activity, object: o] in query,
1120 where:
1121 fragment(
1122 "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
1123 activity.data,
1124 activity.data,
1125 activity.data,
1126 ^ids
1127 )
1128 )
1129 end
1130
1131 defp restrict_pinned(query, _), do: query
1132
1133 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1134 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1135
1136 from(
1137 activity in query,
1138 where:
1139 fragment(
1140 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1141 activity.data,
1142 activity.actor,
1143 ^muted_reblogs
1144 )
1145 )
1146 end
1147
1148 defp restrict_muted_reblogs(query, _), do: query
1149
1150 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1151 from(
1152 activity in query,
1153 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1154 )
1155 end
1156
1157 defp restrict_instance(query, _), do: query
1158
1159 defp restrict_filtered(query, %{user: %User{} = user}) do
1160 case Filter.compose_regex(user) do
1161 nil ->
1162 query
1163
1164 regex ->
1165 from([activity, object] in query,
1166 where:
1167 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1168 activity.actor == ^user.ap_id
1169 )
1170 end
1171 end
1172
1173 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1174 restrict_filtered(query, %{user: user})
1175 end
1176
1177 defp restrict_filtered(query, _), do: query
1178
1179 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1180
1181 defp exclude_poll_votes(query, _) do
1182 if has_named_binding?(query, :object) do
1183 from([activity, object: o] in query,
1184 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1185 )
1186 else
1187 query
1188 end
1189 end
1190
1191 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1192
1193 defp exclude_chat_messages(query, _) do
1194 if has_named_binding?(query, :object) do
1195 from([activity, object: o] in query,
1196 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1197 )
1198 else
1199 query
1200 end
1201 end
1202
1203 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1204
1205 defp exclude_invisible_actors(query, _opts) do
1206 invisible_ap_ids =
1207 User.Query.build(%{invisible: true, select: [:ap_id]})
1208 |> Repo.all()
1209 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1210
1211 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1212 end
1213
1214 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1215 from(activity in query, where: activity.id != ^id)
1216 end
1217
1218 defp exclude_id(query, _), do: query
1219
1220 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1221
1222 defp maybe_preload_objects(query, _) do
1223 query
1224 |> Activity.with_preloaded_object()
1225 end
1226
1227 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1228
1229 defp maybe_preload_bookmarks(query, opts) do
1230 query
1231 |> Activity.with_preloaded_bookmark(opts[:user])
1232 end
1233
1234 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1235 query
1236 |> Activity.with_preloaded_report_notes()
1237 end
1238
1239 defp maybe_preload_report_notes(query, _), do: query
1240
1241 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1242
1243 defp maybe_set_thread_muted_field(query, opts) do
1244 query
1245 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1246 end
1247
1248 defp maybe_order(query, %{order: :desc}) do
1249 query
1250 |> order_by(desc: :id)
1251 end
1252
1253 defp maybe_order(query, %{order: :asc}) do
1254 query
1255 |> order_by(asc: :id)
1256 end
1257
1258 defp maybe_order(query, _), do: query
1259
1260 defp normalize_fetch_activities_query_opts(opts) do
1261 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1262 case opts[key] do
1263 value when is_bitstring(value) ->
1264 Map.put(opts, key, Hashtag.normalize_name(value))
1265
1266 value when is_list(value) ->
1267 normalized_value =
1268 value
1269 |> Enum.map(&Hashtag.normalize_name/1)
1270 |> Enum.uniq()
1271
1272 Map.put(opts, key, normalized_value)
1273
1274 _ ->
1275 opts
1276 end
1277 end)
1278 end
1279
1280 defp fetch_activities_query_ap_ids_ops(opts) do
1281 source_user = opts[:muting_user]
1282 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1283
1284 ap_id_relationships =
1285 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1286 [:block | ap_id_relationships]
1287 else
1288 ap_id_relationships
1289 end
1290
1291 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1292
1293 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1294 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1295
1296 restrict_muted_reblogs_opts =
1297 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1298
1299 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1300 end
1301
1302 def fetch_activities_query(recipients, opts \\ %{}) do
1303 opts = normalize_fetch_activities_query_opts(opts)
1304
1305 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1306 fetch_activities_query_ap_ids_ops(opts)
1307
1308 config = %{
1309 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1310 }
1311
1312 query =
1313 Activity
1314 |> maybe_preload_objects(opts)
1315 |> maybe_preload_bookmarks(opts)
1316 |> maybe_preload_report_notes(opts)
1317 |> maybe_set_thread_muted_field(opts)
1318 |> maybe_order(opts)
1319 |> restrict_recipients(recipients, opts[:user])
1320 |> restrict_replies(opts)
1321 |> restrict_since(opts)
1322 |> restrict_local(opts)
1323 |> restrict_remote(opts)
1324 |> restrict_actor(opts)
1325 |> restrict_type(opts)
1326 |> restrict_state(opts)
1327 |> restrict_favorited_by(opts)
1328 |> restrict_blocked(restrict_blocked_opts)
1329 |> restrict_blockers_visibility(opts)
1330 |> restrict_muted(restrict_muted_opts)
1331 |> restrict_filtered(opts)
1332 |> restrict_media(opts)
1333 |> restrict_visibility(opts)
1334 |> restrict_thread_visibility(opts, config)
1335 |> restrict_reblogs(opts)
1336 |> restrict_pinned(opts)
1337 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1338 |> restrict_instance(opts)
1339 |> restrict_announce_object_actor(opts)
1340 |> restrict_filtered(opts)
1341 |> Activity.restrict_deactivated_users()
1342 |> exclude_poll_votes(opts)
1343 |> exclude_chat_messages(opts)
1344 |> exclude_invisible_actors(opts)
1345 |> exclude_visibility(opts)
1346
1347 if Config.feature_enabled?(:improved_hashtag_timeline) do
1348 query
1349 |> restrict_hashtag_any(opts)
1350 |> restrict_hashtag_all(opts)
1351 |> restrict_hashtag_reject_any(opts)
1352 else
1353 query
1354 |> restrict_embedded_tag_any(opts)
1355 |> restrict_embedded_tag_all(opts)
1356 |> restrict_embedded_tag_reject_any(opts)
1357 end
1358 end
1359
1360 @doc """
1361 Fetch favorites activities of user with order by sort adds to favorites
1362 """
1363 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1364 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1365 user.ap_id
1366 |> Activity.Queries.by_actor()
1367 |> Activity.Queries.by_type("Like")
1368 |> Activity.with_joined_object()
1369 |> Object.with_joined_activity()
1370 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1371 |> order_by([like, _, _], desc_nulls_last: like.id)
1372 |> Pagination.fetch_paginated(
1373 Map.merge(params, %{skip_order: true}),
1374 pagination
1375 )
1376 end
1377
1378 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1379 Enum.map(activities, fn
1380 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1381 if Enum.any?(bcc, &(&1 in list_memberships)) do
1382 update_in(activity.data["cc"], &[user_ap_id | &1])
1383 else
1384 activity
1385 end
1386
1387 activity ->
1388 activity
1389 end)
1390 end
1391
1392 defp maybe_update_cc(activities, _, _), do: activities
1393
1394 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1395 from(activity in query,
1396 where:
1397 fragment("? && ?", activity.recipients, ^recipients) or
1398 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1399 ^Constants.as_public() in activity.recipients)
1400 )
1401 end
1402
1403 def fetch_activities_bounded(
1404 recipients,
1405 recipients_with_public,
1406 opts \\ %{},
1407 pagination \\ :keyset
1408 ) do
1409 fetch_activities_query([], opts)
1410 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1411 |> Pagination.fetch_paginated(opts, pagination)
1412 |> Enum.reverse()
1413 end
1414
1415 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1416 def upload(file, opts \\ []) do
1417 with {:ok, data} <- Upload.store(file, opts) do
1418 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1419
1420 Repo.insert(%Object{data: obj_data})
1421 end
1422 end
1423
1424 @spec get_actor_url(any()) :: binary() | nil
1425 defp get_actor_url(url) when is_binary(url), do: url
1426 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1427
1428 defp get_actor_url(url) when is_list(url) do
1429 url
1430 |> List.first()
1431 |> get_actor_url()
1432 end
1433
1434 defp get_actor_url(_url), do: nil
1435
1436 defp normalize_image(%{"url" => url}) do
1437 %{
1438 "type" => "Image",
1439 "url" => [%{"href" => url}]
1440 }
1441 end
1442
1443 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1444 defp normalize_image(_), do: nil
1445
1446 defp object_to_user_data(data) do
1447 fields =
1448 data
1449 |> Map.get("attachment", [])
1450 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1451 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1452
1453 emojis =
1454 data
1455 |> Map.get("tag", [])
1456 |> Enum.filter(fn
1457 %{"type" => "Emoji"} -> true
1458 _ -> false
1459 end)
1460 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1461 {String.trim(name, ":"), url}
1462 end)
1463
1464 is_locked = data["manuallyApprovesFollowers"] || false
1465 capabilities = data["capabilities"] || %{}
1466 accepts_chat_messages = capabilities["acceptsChatMessages"]
1467 data = Transmogrifier.maybe_fix_user_object(data)
1468 is_discoverable = data["discoverable"] || false
1469 invisible = data["invisible"] || false
1470 actor_type = data["type"] || "Person"
1471
1472 featured_address = data["featured"]
1473 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1474
1475 public_key =
1476 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1477 data["publicKey"]["publicKeyPem"]
1478 else
1479 nil
1480 end
1481
1482 shared_inbox =
1483 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1484 data["endpoints"]["sharedInbox"]
1485 else
1486 nil
1487 end
1488
1489 user_data = %{
1490 ap_id: data["id"],
1491 uri: get_actor_url(data["url"]),
1492 ap_enabled: true,
1493 banner: normalize_image(data["image"]),
1494 fields: fields,
1495 emoji: emojis,
1496 is_locked: is_locked,
1497 is_discoverable: is_discoverable,
1498 invisible: invisible,
1499 avatar: normalize_image(data["icon"]),
1500 name: data["name"],
1501 follower_address: data["followers"],
1502 following_address: data["following"],
1503 featured_address: featured_address,
1504 bio: data["summary"] || "",
1505 actor_type: actor_type,
1506 also_known_as: Map.get(data, "alsoKnownAs", []),
1507 public_key: public_key,
1508 inbox: data["inbox"],
1509 shared_inbox: shared_inbox,
1510 accepts_chat_messages: accepts_chat_messages,
1511 pinned_objects: pinned_objects
1512 }
1513
1514 # nickname can be nil because of virtual actors
1515 if data["preferredUsername"] do
1516 Map.put(
1517 user_data,
1518 :nickname,
1519 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1520 )
1521 else
1522 Map.put(user_data, :nickname, nil)
1523 end
1524 end
1525
1526 def fetch_follow_information_for_user(user) do
1527 with {:ok, following_data} <-
1528 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1529 {:ok, hide_follows} <- collection_private(following_data),
1530 {:ok, followers_data} <-
1531 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1532 {:ok, hide_followers} <- collection_private(followers_data) do
1533 {:ok,
1534 %{
1535 hide_follows: hide_follows,
1536 follower_count: normalize_counter(followers_data["totalItems"]),
1537 following_count: normalize_counter(following_data["totalItems"]),
1538 hide_followers: hide_followers
1539 }}
1540 else
1541 {:error, _} = e -> e
1542 e -> {:error, e}
1543 end
1544 end
1545
1546 defp normalize_counter(counter) when is_integer(counter), do: counter
1547 defp normalize_counter(_), do: 0
1548
1549 def maybe_update_follow_information(user_data) do
1550 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1551 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1552 {_, true} <-
1553 {:collections_available,
1554 !!(user_data[:following_address] && user_data[:follower_address])},
1555 {:ok, info} <-
1556 fetch_follow_information_for_user(user_data) do
1557 info = Map.merge(user_data[:info] || %{}, info)
1558
1559 user_data
1560 |> Map.put(:info, info)
1561 else
1562 {:user_type_check, false} ->
1563 user_data
1564
1565 {:collections_available, false} ->
1566 user_data
1567
1568 {:enabled, false} ->
1569 user_data
1570
1571 e ->
1572 Logger.error(
1573 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1574 )
1575
1576 user_data
1577 end
1578 end
1579
1580 defp collection_private(%{"first" => %{"type" => type}})
1581 when type in ["CollectionPage", "OrderedCollectionPage"],
1582 do: {:ok, false}
1583
1584 defp collection_private(%{"first" => first}) do
1585 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1586 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1587 {:ok, false}
1588 else
1589 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1590 {:error, _} = e -> e
1591 e -> {:error, e}
1592 end
1593 end
1594
1595 defp collection_private(_data), do: {:ok, true}
1596
1597 def user_data_from_user_object(data) do
1598 with {:ok, data} <- MRF.filter(data) do
1599 {:ok, object_to_user_data(data)}
1600 else
1601 e -> {:error, e}
1602 end
1603 end
1604
1605 def fetch_and_prepare_user_from_ap_id(ap_id) do
1606 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1607 {:ok, data} <- user_data_from_user_object(data) do
1608 {:ok, maybe_update_follow_information(data)}
1609 else
1610 # If this has been deleted, only log a debug and not an error
1611 {:error, "Object has been deleted" = e} ->
1612 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1613 {:error, e}
1614
1615 {:error, {:reject, reason} = e} ->
1616 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1617 {:error, e}
1618
1619 {:error, e} ->
1620 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1621 {:error, e}
1622 end
1623 end
1624
1625 def maybe_handle_clashing_nickname(data) do
1626 with nickname when is_binary(nickname) <- data[:nickname],
1627 %User{} = old_user <- User.get_by_nickname(nickname),
1628 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1629 Logger.info(
1630 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1631 )
1632
1633 old_user
1634 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1635 |> User.update_and_set_cache()
1636 else
1637 {:ap_id_comparison, true} ->
1638 Logger.info(
1639 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1640 )
1641
1642 _ ->
1643 nil
1644 end
1645 end
1646
1647 def pin_data_from_featured_collection(%{
1648 "type" => "OrderedCollection",
1649 "first" => first
1650 }) do
1651 with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(first) do
1652 page
1653 |> Map.get("orderedItems")
1654 |> Map.new(fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1655 else
1656 e ->
1657 Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
1658 {:ok, %{}}
1659 end
1660 end
1661
1662 def pin_data_from_featured_collection(
1663 %{
1664 "type" => type
1665 } = collection
1666 )
1667 when type in ["OrderedCollection", "Collection"] do
1668 {:ok, objects} = Collections.Fetcher.fetch_collection(collection)
1669 Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
1670 end
1671
1672 def fetch_and_prepare_featured_from_ap_id(nil) do
1673 {:ok, %{}}
1674 end
1675
1676 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1677 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1678 {:ok, pin_data_from_featured_collection(data)}
1679 else
1680 e ->
1681 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1682 {:ok, %{}}
1683 end
1684 end
1685
1686 def pinned_fetch_task(nil), do: nil
1687
1688 def pinned_fetch_task(%{pinned_objects: pins}) do
1689 if Enum.all?(pins, fn {ap_id, _} ->
1690 Object.get_cached_by_ap_id(ap_id) ||
1691 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1692 end) do
1693 :ok
1694 else
1695 :error
1696 end
1697 end
1698
1699 def make_user_from_ap_id(ap_id) do
1700 user = User.get_cached_by_ap_id(ap_id)
1701
1702 if user && !User.ap_enabled?(user) do
1703 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1704 else
1705 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1706 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1707
1708 if user do
1709 user
1710 |> User.remote_user_changeset(data)
1711 |> User.update_and_set_cache()
1712 else
1713 maybe_handle_clashing_nickname(data)
1714
1715 data
1716 |> User.remote_user_changeset()
1717 |> Repo.insert()
1718 |> User.set_cache()
1719 end
1720 end
1721 end
1722 end
1723
1724 def make_user_from_nickname(nickname) do
1725 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1726 make_user_from_ap_id(ap_id)
1727 else
1728 _e -> {:error, "No AP id in WebFinger"}
1729 end
1730 end
1731
1732 # filter out broken threads
1733 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1734 entire_thread_visible_for_user?(activity, user)
1735 end
1736
1737 # do post-processing on a specific activity
1738 def contain_activity(%Activity{} = activity, %User{} = user) do
1739 contain_broken_threads(activity, user)
1740 end
1741
1742 def fetch_direct_messages_query do
1743 Activity
1744 |> restrict_type(%{type: "Create"})
1745 |> restrict_visibility(%{visibility: "direct"})
1746 |> order_by([activity], asc: activity.id)
1747 end
1748 end