[#3213] Speedup of HashtagsTableMigrator (query optimization). State handling fix.
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{deactivated: deactivated} -> not deactivated
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
673 raise_on_missing_preload()
674 end
675
676 defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
677 from(
678 [_activity, object] in query,
679 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
680 )
681 end
682
683 defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject})
684 when is_binary(tag_reject) do
685 restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]})
686 end
687
688 defp restrict_embedded_tag_reject(query, _), do: query
689
690 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
691 raise_on_missing_preload()
692 end
693
694 defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
695 from(
696 [_activity, object] in query,
697 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
698 )
699 end
700
701 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
702 restrict_embedded_tag(query, %{tag: tag})
703 end
704
705 defp restrict_embedded_tag_all(query, _), do: query
706
707 defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do
708 raise_on_missing_preload()
709 end
710
711 defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do
712 from(
713 [_activity, object] in query,
714 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
715 )
716 end
717
718 defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do
719 restrict_embedded_tag(query, %{tag: [tag]})
720 end
721
722 defp restrict_embedded_tag(query, _), do: query
723
724 defp hashtag_conditions(opts) do
725 [:tag, :tag_all, :tag_reject]
726 |> Enum.map(&opts[&1])
727 |> Enum.map(&List.wrap(&1))
728 end
729
730 # Note: times out on larger instances (with default timeout), intended for complex queries
731 defp restrict_hashtag_agg(query, opts) do
732 [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
733 has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
734
735 cond do
736 !has_conditions ->
737 query
738
739 opts[:skip_preload] ->
740 raise_on_missing_preload()
741
742 true ->
743 query
744 |> group_by_all_bindings()
745 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
746 |> maybe_restrict_hashtag_any(tag_any)
747 |> maybe_restrict_hashtag_all(tag_all)
748 |> maybe_restrict_hashtag_reject_any(tag_reject)
749 end
750 end
751
752 # Groups by all bindings to allow aggregation on hashtags
753 defp group_by_all_bindings(query) do
754 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
755 cond do
756 Enum.count(query.aliases) == 4 ->
757 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
758
759 Enum.count(query.aliases) == 3 ->
760 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
761
762 Enum.count(query.aliases) == 2 ->
763 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
764
765 true ->
766 from([a, o] in query, group_by: [a.id, o.id])
767 end
768 end
769
770 defp maybe_restrict_hashtag_any(query, []) do
771 query
772 end
773
774 defp maybe_restrict_hashtag_any(query, tags) do
775 having(
776 query,
777 [hashtag: hashtag],
778 fragment("array_agg(?) && (?)", hashtag.name, ^tags)
779 )
780 end
781
782 defp maybe_restrict_hashtag_all(query, []) do
783 query
784 end
785
786 defp maybe_restrict_hashtag_all(query, tags) do
787 having(
788 query,
789 [hashtag: hashtag],
790 fragment("array_agg(?) @> (?)", hashtag.name, ^tags)
791 )
792 end
793
794 defp maybe_restrict_hashtag_reject_any(query, []) do
795 query
796 end
797
798 defp maybe_restrict_hashtag_reject_any(query, tags) do
799 having(
800 query,
801 [hashtag: hashtag],
802 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags)
803 )
804 end
805
806 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
807 raise_on_missing_preload()
808 end
809
810 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
811 query
812 |> group_by_all_bindings()
813 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
814 |> having(
815 [hashtag: hashtag],
816 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
817 )
818 end
819
820 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
821 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
822 end
823
824 defp restrict_hashtag_reject_any(query, _), do: query
825
826 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
827 raise_on_missing_preload()
828 end
829
830 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
831 Enum.reduce(
832 tags,
833 query,
834 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
835 )
836 end
837
838 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
839 restrict_hashtag_any(query, %{tag: tag})
840 end
841
842 defp restrict_hashtag_all(query, _), do: query
843
844 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
845 raise_on_missing_preload()
846 end
847
848 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
849 from(
850 [_activity, object] in query,
851 join: hashtag in assoc(object, :hashtags),
852 where: hashtag.name in ^tags
853 )
854 end
855
856 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
857 restrict_hashtag_any(query, %{tag: [tag]})
858 end
859
860 defp restrict_hashtag_any(query, _), do: query
861
862 defp raise_on_missing_preload do
863 raise "Can't use the child object without preloading!"
864 end
865
866 defp restrict_recipients(query, [], _user), do: query
867
868 defp restrict_recipients(query, recipients, nil) do
869 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
870 end
871
872 defp restrict_recipients(query, recipients, user) do
873 from(
874 activity in query,
875 where: fragment("? && ?", ^recipients, activity.recipients),
876 or_where: activity.actor == ^user.ap_id
877 )
878 end
879
880 defp restrict_local(query, %{local_only: true}) do
881 from(activity in query, where: activity.local == true)
882 end
883
884 defp restrict_local(query, _), do: query
885
886 defp restrict_actor(query, %{actor_id: actor_id}) do
887 from(activity in query, where: activity.actor == ^actor_id)
888 end
889
890 defp restrict_actor(query, _), do: query
891
892 defp restrict_type(query, %{type: type}) when is_binary(type) do
893 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
894 end
895
896 defp restrict_type(query, %{type: type}) do
897 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
898 end
899
900 defp restrict_type(query, _), do: query
901
902 defp restrict_state(query, %{state: state}) do
903 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
904 end
905
906 defp restrict_state(query, _), do: query
907
908 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
909 from(
910 [_activity, object] in query,
911 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
912 )
913 end
914
915 defp restrict_favorited_by(query, _), do: query
916
917 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
918 raise "Can't use the child object without preloading!"
919 end
920
921 defp restrict_media(query, %{only_media: true}) do
922 from(
923 [activity, object] in query,
924 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
925 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
926 )
927 end
928
929 defp restrict_media(query, _), do: query
930
931 defp restrict_replies(query, %{exclude_replies: true}) do
932 from(
933 [_activity, object] in query,
934 where: fragment("?->>'inReplyTo' is null", object.data)
935 )
936 end
937
938 defp restrict_replies(query, %{
939 reply_filtering_user: %User{} = user,
940 reply_visibility: "self"
941 }) do
942 from(
943 [activity, object] in query,
944 where:
945 fragment(
946 "?->>'inReplyTo' is null OR ? = ANY(?)",
947 object.data,
948 ^user.ap_id,
949 activity.recipients
950 )
951 )
952 end
953
954 defp restrict_replies(query, %{
955 reply_filtering_user: %User{} = user,
956 reply_visibility: "following"
957 }) do
958 from(
959 [activity, object] in query,
960 where:
961 fragment(
962 """
963 ?->>'type' != 'Create' -- This isn't a Create
964 OR ?->>'inReplyTo' is null -- this isn't a reply
965 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
966 -- unless they are the author (because authors
967 -- are also part of the recipients). This leads
968 -- to a bug that self-replies by friends won't
969 -- show up.
970 OR ? = ? -- The actor is us
971 """,
972 activity.data,
973 object.data,
974 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
975 activity.recipients,
976 activity.actor,
977 activity.actor,
978 ^user.ap_id
979 )
980 )
981 end
982
983 defp restrict_replies(query, _), do: query
984
985 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
986 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
987 end
988
989 defp restrict_reblogs(query, _), do: query
990
991 defp restrict_muted(query, %{with_muted: true}), do: query
992
993 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
994 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
995
996 query =
997 from([activity] in query,
998 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
999 where:
1000 fragment(
1001 "not (?->'to' \\?| ?) or ? = ?",
1002 activity.data,
1003 ^mutes,
1004 activity.actor,
1005 ^user.ap_id
1006 )
1007 )
1008
1009 unless opts[:skip_preload] do
1010 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1011 else
1012 query
1013 end
1014 end
1015
1016 defp restrict_muted(query, _), do: query
1017
1018 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1019 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1020 domain_blocks = user.domain_blocks || []
1021
1022 following_ap_ids = User.get_friends_ap_ids(user)
1023
1024 query =
1025 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1026
1027 from(
1028 [activity, object: o] in query,
1029 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1030 where:
1031 fragment(
1032 "((not (? && ?)) or ? = ?)",
1033 activity.recipients,
1034 ^blocked_ap_ids,
1035 activity.actor,
1036 ^user.ap_id
1037 ),
1038 where:
1039 fragment(
1040 "recipients_contain_blocked_domains(?, ?) = false",
1041 activity.recipients,
1042 ^domain_blocks
1043 ),
1044 where:
1045 fragment(
1046 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1047 activity.data,
1048 activity.data,
1049 ^blocked_ap_ids
1050 ),
1051 where:
1052 fragment(
1053 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1054 activity.actor,
1055 ^domain_blocks,
1056 activity.actor,
1057 ^following_ap_ids
1058 ),
1059 where:
1060 fragment(
1061 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1062 o.data,
1063 ^domain_blocks,
1064 o.data,
1065 ^following_ap_ids
1066 )
1067 )
1068 end
1069
1070 defp restrict_blocked(query, _), do: query
1071
1072 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1073 from(
1074 activity in query,
1075 where:
1076 fragment(
1077 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1078 activity.data,
1079 ^[Constants.as_public()]
1080 )
1081 )
1082 end
1083
1084 defp restrict_unlisted(query, _), do: query
1085
1086 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1087 from(activity in query, where: activity.id in ^ids)
1088 end
1089
1090 defp restrict_pinned(query, _), do: query
1091
1092 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1093 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1094
1095 from(
1096 activity in query,
1097 where:
1098 fragment(
1099 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1100 activity.data,
1101 activity.actor,
1102 ^muted_reblogs
1103 )
1104 )
1105 end
1106
1107 defp restrict_muted_reblogs(query, _), do: query
1108
1109 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1110 from(
1111 activity in query,
1112 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1113 )
1114 end
1115
1116 defp restrict_instance(query, _), do: query
1117
1118 defp restrict_filtered(query, %{user: %User{} = user}) do
1119 case Filter.compose_regex(user) do
1120 nil ->
1121 query
1122
1123 regex ->
1124 from([activity, object] in query,
1125 where:
1126 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1127 activity.actor == ^user.ap_id
1128 )
1129 end
1130 end
1131
1132 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1133 restrict_filtered(query, %{user: user})
1134 end
1135
1136 defp restrict_filtered(query, _), do: query
1137
1138 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1139
1140 defp exclude_poll_votes(query, _) do
1141 if has_named_binding?(query, :object) do
1142 from([activity, object: o] in query,
1143 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1144 )
1145 else
1146 query
1147 end
1148 end
1149
1150 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1151
1152 defp exclude_chat_messages(query, _) do
1153 if has_named_binding?(query, :object) do
1154 from([activity, object: o] in query,
1155 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1156 )
1157 else
1158 query
1159 end
1160 end
1161
1162 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1163
1164 defp exclude_invisible_actors(query, _opts) do
1165 invisible_ap_ids =
1166 User.Query.build(%{invisible: true, select: [:ap_id]})
1167 |> Repo.all()
1168 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1169
1170 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1171 end
1172
1173 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1174 from(activity in query, where: activity.id != ^id)
1175 end
1176
1177 defp exclude_id(query, _), do: query
1178
1179 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1180
1181 defp maybe_preload_objects(query, _) do
1182 query
1183 |> Activity.with_preloaded_object()
1184 end
1185
1186 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1187
1188 defp maybe_preload_bookmarks(query, opts) do
1189 query
1190 |> Activity.with_preloaded_bookmark(opts[:user])
1191 end
1192
1193 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1194 query
1195 |> Activity.with_preloaded_report_notes()
1196 end
1197
1198 defp maybe_preload_report_notes(query, _), do: query
1199
1200 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1201
1202 defp maybe_set_thread_muted_field(query, opts) do
1203 query
1204 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1205 end
1206
1207 defp maybe_order(query, %{order: :desc}) do
1208 query
1209 |> order_by(desc: :id)
1210 end
1211
1212 defp maybe_order(query, %{order: :asc}) do
1213 query
1214 |> order_by(asc: :id)
1215 end
1216
1217 defp maybe_order(query, _), do: query
1218
1219 defp fetch_activities_query_ap_ids_ops(opts) do
1220 source_user = opts[:muting_user]
1221 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1222
1223 ap_id_relationships =
1224 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1225 [:block | ap_id_relationships]
1226 else
1227 ap_id_relationships
1228 end
1229
1230 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1231
1232 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1233 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1234
1235 restrict_muted_reblogs_opts =
1236 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1237
1238 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1239 end
1240
1241 def fetch_activities_query(recipients, opts \\ %{}) do
1242 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1243 fetch_activities_query_ap_ids_ops(opts)
1244
1245 config = %{
1246 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1247 }
1248
1249 query =
1250 Activity
1251 |> maybe_preload_objects(opts)
1252 |> maybe_preload_bookmarks(opts)
1253 |> maybe_preload_report_notes(opts)
1254 |> maybe_set_thread_muted_field(opts)
1255 |> maybe_order(opts)
1256 |> restrict_recipients(recipients, opts[:user])
1257 |> restrict_replies(opts)
1258 |> restrict_since(opts)
1259 |> restrict_local(opts)
1260 |> restrict_actor(opts)
1261 |> restrict_type(opts)
1262 |> restrict_state(opts)
1263 |> restrict_favorited_by(opts)
1264 |> restrict_blocked(restrict_blocked_opts)
1265 |> restrict_muted(restrict_muted_opts)
1266 |> restrict_filtered(opts)
1267 |> restrict_media(opts)
1268 |> restrict_visibility(opts)
1269 |> restrict_thread_visibility(opts, config)
1270 |> restrict_reblogs(opts)
1271 |> restrict_pinned(opts)
1272 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1273 |> restrict_instance(opts)
1274 |> restrict_announce_object_actor(opts)
1275 |> restrict_filtered(opts)
1276 |> Activity.restrict_deactivated_users()
1277 |> exclude_poll_votes(opts)
1278 |> exclude_chat_messages(opts)
1279 |> exclude_invisible_actors(opts)
1280 |> exclude_visibility(opts)
1281
1282 hashtag_timeline_strategy = Config.improved_hashtag_timeline()
1283
1284 cond do
1285 !hashtag_timeline_strategy ->
1286 query
1287 |> restrict_embedded_tag(opts)
1288 |> restrict_embedded_tag_reject(opts)
1289 |> restrict_embedded_tag_all(opts)
1290
1291 hashtag_timeline_strategy == :prefer_aggregation ->
1292 restrict_hashtag_agg(query, opts)
1293
1294 true ->
1295 query
1296 |> distinct([activity], true)
1297 |> restrict_hashtag_any(opts)
1298 |> restrict_hashtag_all(opts)
1299 |> restrict_hashtag_reject_any(opts)
1300 end
1301 end
1302
1303 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1304 list_memberships = Pleroma.List.memberships(opts[:user])
1305
1306 fetch_activities_query(recipients ++ list_memberships, opts)
1307 |> Pagination.fetch_paginated(opts, pagination)
1308 |> Enum.reverse()
1309 |> maybe_update_cc(list_memberships, opts[:user])
1310 end
1311
1312 @doc """
1313 Fetch favorites activities of user with order by sort adds to favorites
1314 """
1315 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1316 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1317 user.ap_id
1318 |> Activity.Queries.by_actor()
1319 |> Activity.Queries.by_type("Like")
1320 |> Activity.with_joined_object()
1321 |> Object.with_joined_activity()
1322 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1323 |> order_by([like, _, _], desc_nulls_last: like.id)
1324 |> Pagination.fetch_paginated(
1325 Map.merge(params, %{skip_order: true}),
1326 pagination
1327 )
1328 end
1329
1330 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1331 Enum.map(activities, fn
1332 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1333 if Enum.any?(bcc, &(&1 in list_memberships)) do
1334 update_in(activity.data["cc"], &[user_ap_id | &1])
1335 else
1336 activity
1337 end
1338
1339 activity ->
1340 activity
1341 end)
1342 end
1343
1344 defp maybe_update_cc(activities, _, _), do: activities
1345
1346 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1347 from(activity in query,
1348 where:
1349 fragment("? && ?", activity.recipients, ^recipients) or
1350 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1351 ^Constants.as_public() in activity.recipients)
1352 )
1353 end
1354
1355 def fetch_activities_bounded(
1356 recipients,
1357 recipients_with_public,
1358 opts \\ %{},
1359 pagination \\ :keyset
1360 ) do
1361 fetch_activities_query([], opts)
1362 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1363 |> Pagination.fetch_paginated(opts, pagination)
1364 |> Enum.reverse()
1365 end
1366
1367 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1368 def upload(file, opts \\ []) do
1369 with {:ok, data} <- Upload.store(file, opts) do
1370 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1371
1372 Repo.insert(%Object{data: obj_data})
1373 end
1374 end
1375
1376 @spec get_actor_url(any()) :: binary() | nil
1377 defp get_actor_url(url) when is_binary(url), do: url
1378 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1379
1380 defp get_actor_url(url) when is_list(url) do
1381 url
1382 |> List.first()
1383 |> get_actor_url()
1384 end
1385
1386 defp get_actor_url(_url), do: nil
1387
1388 defp object_to_user_data(data) do
1389 avatar =
1390 data["icon"]["url"] &&
1391 %{
1392 "type" => "Image",
1393 "url" => [%{"href" => data["icon"]["url"]}]
1394 }
1395
1396 banner =
1397 data["image"]["url"] &&
1398 %{
1399 "type" => "Image",
1400 "url" => [%{"href" => data["image"]["url"]}]
1401 }
1402
1403 fields =
1404 data
1405 |> Map.get("attachment", [])
1406 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1407 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1408
1409 emojis =
1410 data
1411 |> Map.get("tag", [])
1412 |> Enum.filter(fn
1413 %{"type" => "Emoji"} -> true
1414 _ -> false
1415 end)
1416 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1417 {String.trim(name, ":"), url}
1418 end)
1419
1420 is_locked = data["manuallyApprovesFollowers"] || false
1421 capabilities = data["capabilities"] || %{}
1422 accepts_chat_messages = capabilities["acceptsChatMessages"]
1423 data = Transmogrifier.maybe_fix_user_object(data)
1424 is_discoverable = data["discoverable"] || false
1425 invisible = data["invisible"] || false
1426 actor_type = data["type"] || "Person"
1427
1428 public_key =
1429 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1430 data["publicKey"]["publicKeyPem"]
1431 else
1432 nil
1433 end
1434
1435 shared_inbox =
1436 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1437 data["endpoints"]["sharedInbox"]
1438 else
1439 nil
1440 end
1441
1442 user_data = %{
1443 ap_id: data["id"],
1444 uri: get_actor_url(data["url"]),
1445 ap_enabled: true,
1446 banner: banner,
1447 fields: fields,
1448 emoji: emojis,
1449 is_locked: is_locked,
1450 is_discoverable: is_discoverable,
1451 invisible: invisible,
1452 avatar: avatar,
1453 name: data["name"],
1454 follower_address: data["followers"],
1455 following_address: data["following"],
1456 bio: data["summary"] || "",
1457 actor_type: actor_type,
1458 also_known_as: Map.get(data, "alsoKnownAs", []),
1459 public_key: public_key,
1460 inbox: data["inbox"],
1461 shared_inbox: shared_inbox,
1462 accepts_chat_messages: accepts_chat_messages
1463 }
1464
1465 # nickname can be nil because of virtual actors
1466 if data["preferredUsername"] do
1467 Map.put(
1468 user_data,
1469 :nickname,
1470 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1471 )
1472 else
1473 Map.put(user_data, :nickname, nil)
1474 end
1475 end
1476
1477 def fetch_follow_information_for_user(user) do
1478 with {:ok, following_data} <-
1479 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1480 {:ok, hide_follows} <- collection_private(following_data),
1481 {:ok, followers_data} <-
1482 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1483 {:ok, hide_followers} <- collection_private(followers_data) do
1484 {:ok,
1485 %{
1486 hide_follows: hide_follows,
1487 follower_count: normalize_counter(followers_data["totalItems"]),
1488 following_count: normalize_counter(following_data["totalItems"]),
1489 hide_followers: hide_followers
1490 }}
1491 else
1492 {:error, _} = e -> e
1493 e -> {:error, e}
1494 end
1495 end
1496
1497 defp normalize_counter(counter) when is_integer(counter), do: counter
1498 defp normalize_counter(_), do: 0
1499
1500 def maybe_update_follow_information(user_data) do
1501 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1502 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1503 {_, true} <-
1504 {:collections_available,
1505 !!(user_data[:following_address] && user_data[:follower_address])},
1506 {:ok, info} <-
1507 fetch_follow_information_for_user(user_data) do
1508 info = Map.merge(user_data[:info] || %{}, info)
1509
1510 user_data
1511 |> Map.put(:info, info)
1512 else
1513 {:user_type_check, false} ->
1514 user_data
1515
1516 {:collections_available, false} ->
1517 user_data
1518
1519 {:enabled, false} ->
1520 user_data
1521
1522 e ->
1523 Logger.error(
1524 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1525 )
1526
1527 user_data
1528 end
1529 end
1530
1531 defp collection_private(%{"first" => %{"type" => type}})
1532 when type in ["CollectionPage", "OrderedCollectionPage"],
1533 do: {:ok, false}
1534
1535 defp collection_private(%{"first" => first}) do
1536 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1537 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1538 {:ok, false}
1539 else
1540 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1541 {:error, _} = e -> e
1542 e -> {:error, e}
1543 end
1544 end
1545
1546 defp collection_private(_data), do: {:ok, true}
1547
1548 def user_data_from_user_object(data) do
1549 with {:ok, data} <- MRF.filter(data) do
1550 {:ok, object_to_user_data(data)}
1551 else
1552 e -> {:error, e}
1553 end
1554 end
1555
1556 def fetch_and_prepare_user_from_ap_id(ap_id) do
1557 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1558 {:ok, data} <- user_data_from_user_object(data) do
1559 {:ok, maybe_update_follow_information(data)}
1560 else
1561 # If this has been deleted, only log a debug and not an error
1562 {:error, "Object has been deleted" = e} ->
1563 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1564 {:error, e}
1565
1566 {:error, {:reject, reason} = e} ->
1567 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1568 {:error, e}
1569
1570 {:error, e} ->
1571 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1572 {:error, e}
1573 end
1574 end
1575
1576 def maybe_handle_clashing_nickname(data) do
1577 with nickname when is_binary(nickname) <- data[:nickname],
1578 %User{} = old_user <- User.get_by_nickname(nickname),
1579 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1580 Logger.info(
1581 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1582 data[:ap_id]
1583 }, renaming."
1584 )
1585
1586 old_user
1587 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1588 |> User.update_and_set_cache()
1589 else
1590 {:ap_id_comparison, true} ->
1591 Logger.info(
1592 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1593 )
1594
1595 _ ->
1596 nil
1597 end
1598 end
1599
1600 def make_user_from_ap_id(ap_id) do
1601 user = User.get_cached_by_ap_id(ap_id)
1602
1603 if user && !User.ap_enabled?(user) do
1604 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1605 else
1606 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1607 if user do
1608 user
1609 |> User.remote_user_changeset(data)
1610 |> User.update_and_set_cache()
1611 else
1612 maybe_handle_clashing_nickname(data)
1613
1614 data
1615 |> User.remote_user_changeset()
1616 |> Repo.insert()
1617 |> User.set_cache()
1618 end
1619 end
1620 end
1621 end
1622
1623 def make_user_from_nickname(nickname) do
1624 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1625 make_user_from_ap_id(ap_id)
1626 else
1627 _e -> {:error, "No AP id in WebFinger"}
1628 end
1629 end
1630
1631 # filter out broken threads
1632 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1633 entire_thread_visible_for_user?(activity, user)
1634 end
1635
1636 # do post-processing on a specific activity
1637 def contain_activity(%Activity{} = activity, %User{} = user) do
1638 contain_broken_threads(activity, user)
1639 end
1640
1641 def fetch_direct_messages_query do
1642 Activity
1643 |> restrict_type(%{type: "Create"})
1644 |> restrict_visibility(%{visibility: "direct"})
1645 |> order_by([activity], asc: activity.id)
1646 end
1647 end