0609827ecf0a1c024c9b3025b0770d370c9529e8
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{deactivated: deactivated} -> not deactivated
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
673 raise_on_missing_preload()
674 end
675
676 defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
677 from(
678 [_activity, object] in query,
679 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
680 )
681 end
682
683 defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject})
684 when is_binary(tag_reject) do
685 restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]})
686 end
687
688 defp restrict_embedded_tag_reject(query, _), do: query
689
690 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
691 raise_on_missing_preload()
692 end
693
694 defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
695 from(
696 [_activity, object] in query,
697 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
698 )
699 end
700
701 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
702 restrict_embedded_tag(query, %{tag: tag})
703 end
704
705 defp restrict_embedded_tag_all(query, _), do: query
706
707 defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do
708 raise_on_missing_preload()
709 end
710
711 defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do
712 from(
713 [_activity, object] in query,
714 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
715 )
716 end
717
718 defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do
719 restrict_embedded_tag(query, %{tag: [tag]})
720 end
721
722 defp restrict_embedded_tag(query, _), do: query
723
724 defp hashtag_conditions(opts) do
725 [:tag, :tag_all, :tag_reject]
726 |> Enum.map(&opts[&1])
727 |> Enum.map(&List.wrap(&1))
728 end
729
730 defp restrict_hashtag_agg(query, opts) do
731 [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
732 has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
733
734 cond do
735 !has_conditions ->
736 query
737
738 opts[:skip_preload] ->
739 raise_on_missing_preload()
740
741 true ->
742 query
743 |> group_by_all_bindings()
744 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
745 |> maybe_restrict_hashtag_any(tag_any)
746 |> maybe_restrict_hashtag_all(tag_all)
747 |> maybe_restrict_hashtag_reject_any(tag_reject)
748 end
749 end
750
751 # Groups by all bindings to allow aggregation on hashtags
752 defp group_by_all_bindings(query) do
753 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
754 cond do
755 Enum.count(query.aliases) == 4 ->
756 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
757
758 Enum.count(query.aliases) == 3 ->
759 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
760
761 Enum.count(query.aliases) == 2 ->
762 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
763
764 true ->
765 from([a, o] in query, group_by: [a.id, o.id])
766 end
767 end
768
769 defp maybe_restrict_hashtag_any(query, []) do
770 query
771 end
772
773 defp maybe_restrict_hashtag_any(query, tags) do
774 having(
775 query,
776 [hashtag: hashtag],
777 fragment("array_agg(?) && (?)", hashtag.name, ^tags)
778 )
779 end
780
781 defp maybe_restrict_hashtag_all(query, []) do
782 query
783 end
784
785 defp maybe_restrict_hashtag_all(query, tags) do
786 having(
787 query,
788 [hashtag: hashtag],
789 fragment("array_agg(?) @> (?)", hashtag.name, ^tags)
790 )
791 end
792
793 defp maybe_restrict_hashtag_reject_any(query, []) do
794 query
795 end
796
797 defp maybe_restrict_hashtag_reject_any(query, tags) do
798 having(
799 query,
800 [hashtag: hashtag],
801 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags)
802 )
803 end
804
805 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
806 raise_on_missing_preload()
807 end
808
809 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
810 query
811 |> group_by_all_bindings()
812 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
813 |> having(
814 [hashtag: hashtag],
815 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
816 )
817 end
818
819 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
820 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
821 end
822
823 defp restrict_hashtag_reject_any(query, _), do: query
824
825 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
826 raise_on_missing_preload()
827 end
828
829 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
830 Enum.reduce(
831 tags,
832 query,
833 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
834 )
835 end
836
837 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
838 restrict_hashtag_any(query, %{tag: tag})
839 end
840
841 defp restrict_hashtag_all(query, _), do: query
842
843 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
844 raise_on_missing_preload()
845 end
846
847 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
848 from(
849 [_activity, object] in query,
850 join: hashtag in assoc(object, :hashtags),
851 where: hashtag.name in ^tags
852 )
853 end
854
855 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
856 restrict_hashtag_any(query, %{tag: [tag]})
857 end
858
859 defp restrict_hashtag_any(query, _), do: query
860
861 defp raise_on_missing_preload do
862 raise "Can't use the child object without preloading!"
863 end
864
865 defp restrict_recipients(query, [], _user), do: query
866
867 defp restrict_recipients(query, recipients, nil) do
868 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
869 end
870
871 defp restrict_recipients(query, recipients, user) do
872 from(
873 activity in query,
874 where: fragment("? && ?", ^recipients, activity.recipients),
875 or_where: activity.actor == ^user.ap_id
876 )
877 end
878
879 defp restrict_local(query, %{local_only: true}) do
880 from(activity in query, where: activity.local == true)
881 end
882
883 defp restrict_local(query, _), do: query
884
885 defp restrict_actor(query, %{actor_id: actor_id}) do
886 from(activity in query, where: activity.actor == ^actor_id)
887 end
888
889 defp restrict_actor(query, _), do: query
890
891 defp restrict_type(query, %{type: type}) when is_binary(type) do
892 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
893 end
894
895 defp restrict_type(query, %{type: type}) do
896 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
897 end
898
899 defp restrict_type(query, _), do: query
900
901 defp restrict_state(query, %{state: state}) do
902 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
903 end
904
905 defp restrict_state(query, _), do: query
906
907 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
908 from(
909 [_activity, object] in query,
910 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
911 )
912 end
913
914 defp restrict_favorited_by(query, _), do: query
915
916 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
917 raise "Can't use the child object without preloading!"
918 end
919
920 defp restrict_media(query, %{only_media: true}) do
921 from(
922 [activity, object] in query,
923 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
924 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
925 )
926 end
927
928 defp restrict_media(query, _), do: query
929
930 defp restrict_replies(query, %{exclude_replies: true}) do
931 from(
932 [_activity, object] in query,
933 where: fragment("?->>'inReplyTo' is null", object.data)
934 )
935 end
936
937 defp restrict_replies(query, %{
938 reply_filtering_user: %User{} = user,
939 reply_visibility: "self"
940 }) do
941 from(
942 [activity, object] in query,
943 where:
944 fragment(
945 "?->>'inReplyTo' is null OR ? = ANY(?)",
946 object.data,
947 ^user.ap_id,
948 activity.recipients
949 )
950 )
951 end
952
953 defp restrict_replies(query, %{
954 reply_filtering_user: %User{} = user,
955 reply_visibility: "following"
956 }) do
957 from(
958 [activity, object] in query,
959 where:
960 fragment(
961 """
962 ?->>'type' != 'Create' -- This isn't a Create
963 OR ?->>'inReplyTo' is null -- this isn't a reply
964 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
965 -- unless they are the author (because authors
966 -- are also part of the recipients). This leads
967 -- to a bug that self-replies by friends won't
968 -- show up.
969 OR ? = ? -- The actor is us
970 """,
971 activity.data,
972 object.data,
973 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
974 activity.recipients,
975 activity.actor,
976 activity.actor,
977 ^user.ap_id
978 )
979 )
980 end
981
982 defp restrict_replies(query, _), do: query
983
984 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
985 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
986 end
987
988 defp restrict_reblogs(query, _), do: query
989
990 defp restrict_muted(query, %{with_muted: true}), do: query
991
992 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
993 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
994
995 query =
996 from([activity] in query,
997 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
998 where:
999 fragment(
1000 "not (?->'to' \\?| ?) or ? = ?",
1001 activity.data,
1002 ^mutes,
1003 activity.actor,
1004 ^user.ap_id
1005 )
1006 )
1007
1008 unless opts[:skip_preload] do
1009 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1010 else
1011 query
1012 end
1013 end
1014
1015 defp restrict_muted(query, _), do: query
1016
1017 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1018 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1019 domain_blocks = user.domain_blocks || []
1020
1021 following_ap_ids = User.get_friends_ap_ids(user)
1022
1023 query =
1024 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1025
1026 from(
1027 [activity, object: o] in query,
1028 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1029 where:
1030 fragment(
1031 "((not (? && ?)) or ? = ?)",
1032 activity.recipients,
1033 ^blocked_ap_ids,
1034 activity.actor,
1035 ^user.ap_id
1036 ),
1037 where:
1038 fragment(
1039 "recipients_contain_blocked_domains(?, ?) = false",
1040 activity.recipients,
1041 ^domain_blocks
1042 ),
1043 where:
1044 fragment(
1045 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1046 activity.data,
1047 activity.data,
1048 ^blocked_ap_ids
1049 ),
1050 where:
1051 fragment(
1052 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1053 activity.actor,
1054 ^domain_blocks,
1055 activity.actor,
1056 ^following_ap_ids
1057 ),
1058 where:
1059 fragment(
1060 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1061 o.data,
1062 ^domain_blocks,
1063 o.data,
1064 ^following_ap_ids
1065 )
1066 )
1067 end
1068
1069 defp restrict_blocked(query, _), do: query
1070
1071 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1072 from(
1073 activity in query,
1074 where:
1075 fragment(
1076 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1077 activity.data,
1078 ^[Constants.as_public()]
1079 )
1080 )
1081 end
1082
1083 defp restrict_unlisted(query, _), do: query
1084
1085 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1086 from(activity in query, where: activity.id in ^ids)
1087 end
1088
1089 defp restrict_pinned(query, _), do: query
1090
1091 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1092 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1093
1094 from(
1095 activity in query,
1096 where:
1097 fragment(
1098 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1099 activity.data,
1100 activity.actor,
1101 ^muted_reblogs
1102 )
1103 )
1104 end
1105
1106 defp restrict_muted_reblogs(query, _), do: query
1107
1108 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1109 from(
1110 activity in query,
1111 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1112 )
1113 end
1114
1115 defp restrict_instance(query, _), do: query
1116
1117 defp restrict_filtered(query, %{user: %User{} = user}) do
1118 case Filter.compose_regex(user) do
1119 nil ->
1120 query
1121
1122 regex ->
1123 from([activity, object] in query,
1124 where:
1125 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1126 activity.actor == ^user.ap_id
1127 )
1128 end
1129 end
1130
1131 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1132 restrict_filtered(query, %{user: user})
1133 end
1134
1135 defp restrict_filtered(query, _), do: query
1136
1137 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1138
1139 defp exclude_poll_votes(query, _) do
1140 if has_named_binding?(query, :object) do
1141 from([activity, object: o] in query,
1142 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1143 )
1144 else
1145 query
1146 end
1147 end
1148
1149 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1150
1151 defp exclude_chat_messages(query, _) do
1152 if has_named_binding?(query, :object) do
1153 from([activity, object: o] in query,
1154 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1155 )
1156 else
1157 query
1158 end
1159 end
1160
1161 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1162
1163 defp exclude_invisible_actors(query, _opts) do
1164 invisible_ap_ids =
1165 User.Query.build(%{invisible: true, select: [:ap_id]})
1166 |> Repo.all()
1167 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1168
1169 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1170 end
1171
1172 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1173 from(activity in query, where: activity.id != ^id)
1174 end
1175
1176 defp exclude_id(query, _), do: query
1177
1178 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1179
1180 defp maybe_preload_objects(query, _) do
1181 query
1182 |> Activity.with_preloaded_object()
1183 end
1184
1185 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1186
1187 defp maybe_preload_bookmarks(query, opts) do
1188 query
1189 |> Activity.with_preloaded_bookmark(opts[:user])
1190 end
1191
1192 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1193 query
1194 |> Activity.with_preloaded_report_notes()
1195 end
1196
1197 defp maybe_preload_report_notes(query, _), do: query
1198
1199 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1200
1201 defp maybe_set_thread_muted_field(query, opts) do
1202 query
1203 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1204 end
1205
1206 defp maybe_order(query, %{order: :desc}) do
1207 query
1208 |> order_by(desc: :id)
1209 end
1210
1211 defp maybe_order(query, %{order: :asc}) do
1212 query
1213 |> order_by(asc: :id)
1214 end
1215
1216 defp maybe_order(query, _), do: query
1217
1218 defp fetch_activities_query_ap_ids_ops(opts) do
1219 source_user = opts[:muting_user]
1220 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1221
1222 ap_id_relationships =
1223 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1224 [:block | ap_id_relationships]
1225 else
1226 ap_id_relationships
1227 end
1228
1229 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1230
1231 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1232 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1233
1234 restrict_muted_reblogs_opts =
1235 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1236
1237 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1238 end
1239
1240 def fetch_activities_query(recipients, opts \\ %{}) do
1241 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1242 fetch_activities_query_ap_ids_ops(opts)
1243
1244 config = %{
1245 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1246 }
1247
1248 query =
1249 Activity
1250 |> maybe_preload_objects(opts)
1251 |> maybe_preload_bookmarks(opts)
1252 |> maybe_preload_report_notes(opts)
1253 |> maybe_set_thread_muted_field(opts)
1254 |> maybe_order(opts)
1255 |> restrict_recipients(recipients, opts[:user])
1256 |> restrict_replies(opts)
1257 |> restrict_since(opts)
1258 |> restrict_local(opts)
1259 |> restrict_actor(opts)
1260 |> restrict_type(opts)
1261 |> restrict_state(opts)
1262 |> restrict_favorited_by(opts)
1263 |> restrict_blocked(restrict_blocked_opts)
1264 |> restrict_muted(restrict_muted_opts)
1265 |> restrict_filtered(opts)
1266 |> restrict_media(opts)
1267 |> restrict_visibility(opts)
1268 |> restrict_thread_visibility(opts, config)
1269 |> restrict_reblogs(opts)
1270 |> restrict_pinned(opts)
1271 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1272 |> restrict_instance(opts)
1273 |> restrict_announce_object_actor(opts)
1274 |> restrict_filtered(opts)
1275 |> Activity.restrict_deactivated_users()
1276 |> exclude_poll_votes(opts)
1277 |> exclude_chat_messages(opts)
1278 |> exclude_invisible_actors(opts)
1279 |> exclude_visibility(opts)
1280
1281 hashtag_timeline_strategy = Config.improved_hashtag_timeline()
1282
1283 cond do
1284 !hashtag_timeline_strategy ->
1285 query
1286 |> restrict_embedded_tag(opts)
1287 |> restrict_embedded_tag_reject(opts)
1288 |> restrict_embedded_tag_all(opts)
1289
1290 hashtag_timeline_strategy == :prefer_aggregation ->
1291 restrict_hashtag_agg(query, opts)
1292
1293 hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) ->
1294 query
1295 |> distinct([activity], true)
1296 |> restrict_hashtag_any(opts)
1297 |> restrict_hashtag_all(opts)
1298 |> restrict_hashtag_reject_any(opts)
1299
1300 true ->
1301 restrict_hashtag_agg(query, opts)
1302 end
1303 end
1304
1305 defp avoid_hashtags_aggregation?(opts) do
1306 [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
1307
1308 joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0
1309 Enum.empty?(tag_reject) and joins_count <= 2
1310 end
1311
1312 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1313 list_memberships = Pleroma.List.memberships(opts[:user])
1314
1315 fetch_activities_query(recipients ++ list_memberships, opts)
1316 |> Pagination.fetch_paginated(opts, pagination)
1317 |> Enum.reverse()
1318 |> maybe_update_cc(list_memberships, opts[:user])
1319 end
1320
1321 @doc """
1322 Fetch favorites activities of user with order by sort adds to favorites
1323 """
1324 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1325 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1326 user.ap_id
1327 |> Activity.Queries.by_actor()
1328 |> Activity.Queries.by_type("Like")
1329 |> Activity.with_joined_object()
1330 |> Object.with_joined_activity()
1331 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1332 |> order_by([like, _, _], desc_nulls_last: like.id)
1333 |> Pagination.fetch_paginated(
1334 Map.merge(params, %{skip_order: true}),
1335 pagination
1336 )
1337 end
1338
1339 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1340 Enum.map(activities, fn
1341 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1342 if Enum.any?(bcc, &(&1 in list_memberships)) do
1343 update_in(activity.data["cc"], &[user_ap_id | &1])
1344 else
1345 activity
1346 end
1347
1348 activity ->
1349 activity
1350 end)
1351 end
1352
1353 defp maybe_update_cc(activities, _, _), do: activities
1354
1355 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1356 from(activity in query,
1357 where:
1358 fragment("? && ?", activity.recipients, ^recipients) or
1359 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1360 ^Constants.as_public() in activity.recipients)
1361 )
1362 end
1363
1364 def fetch_activities_bounded(
1365 recipients,
1366 recipients_with_public,
1367 opts \\ %{},
1368 pagination \\ :keyset
1369 ) do
1370 fetch_activities_query([], opts)
1371 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1372 |> Pagination.fetch_paginated(opts, pagination)
1373 |> Enum.reverse()
1374 end
1375
1376 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1377 def upload(file, opts \\ []) do
1378 with {:ok, data} <- Upload.store(file, opts) do
1379 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1380
1381 Repo.insert(%Object{data: obj_data})
1382 end
1383 end
1384
1385 @spec get_actor_url(any()) :: binary() | nil
1386 defp get_actor_url(url) when is_binary(url), do: url
1387 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1388
1389 defp get_actor_url(url) when is_list(url) do
1390 url
1391 |> List.first()
1392 |> get_actor_url()
1393 end
1394
1395 defp get_actor_url(_url), do: nil
1396
1397 defp object_to_user_data(data) do
1398 avatar =
1399 data["icon"]["url"] &&
1400 %{
1401 "type" => "Image",
1402 "url" => [%{"href" => data["icon"]["url"]}]
1403 }
1404
1405 banner =
1406 data["image"]["url"] &&
1407 %{
1408 "type" => "Image",
1409 "url" => [%{"href" => data["image"]["url"]}]
1410 }
1411
1412 fields =
1413 data
1414 |> Map.get("attachment", [])
1415 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1416 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1417
1418 emojis =
1419 data
1420 |> Map.get("tag", [])
1421 |> Enum.filter(fn
1422 %{"type" => "Emoji"} -> true
1423 _ -> false
1424 end)
1425 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1426 {String.trim(name, ":"), url}
1427 end)
1428
1429 is_locked = data["manuallyApprovesFollowers"] || false
1430 capabilities = data["capabilities"] || %{}
1431 accepts_chat_messages = capabilities["acceptsChatMessages"]
1432 data = Transmogrifier.maybe_fix_user_object(data)
1433 is_discoverable = data["discoverable"] || false
1434 invisible = data["invisible"] || false
1435 actor_type = data["type"] || "Person"
1436
1437 public_key =
1438 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1439 data["publicKey"]["publicKeyPem"]
1440 else
1441 nil
1442 end
1443
1444 shared_inbox =
1445 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1446 data["endpoints"]["sharedInbox"]
1447 else
1448 nil
1449 end
1450
1451 user_data = %{
1452 ap_id: data["id"],
1453 uri: get_actor_url(data["url"]),
1454 ap_enabled: true,
1455 banner: banner,
1456 fields: fields,
1457 emoji: emojis,
1458 is_locked: is_locked,
1459 is_discoverable: is_discoverable,
1460 invisible: invisible,
1461 avatar: avatar,
1462 name: data["name"],
1463 follower_address: data["followers"],
1464 following_address: data["following"],
1465 bio: data["summary"] || "",
1466 actor_type: actor_type,
1467 also_known_as: Map.get(data, "alsoKnownAs", []),
1468 public_key: public_key,
1469 inbox: data["inbox"],
1470 shared_inbox: shared_inbox,
1471 accepts_chat_messages: accepts_chat_messages
1472 }
1473
1474 # nickname can be nil because of virtual actors
1475 if data["preferredUsername"] do
1476 Map.put(
1477 user_data,
1478 :nickname,
1479 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1480 )
1481 else
1482 Map.put(user_data, :nickname, nil)
1483 end
1484 end
1485
1486 def fetch_follow_information_for_user(user) do
1487 with {:ok, following_data} <-
1488 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1489 {:ok, hide_follows} <- collection_private(following_data),
1490 {:ok, followers_data} <-
1491 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1492 {:ok, hide_followers} <- collection_private(followers_data) do
1493 {:ok,
1494 %{
1495 hide_follows: hide_follows,
1496 follower_count: normalize_counter(followers_data["totalItems"]),
1497 following_count: normalize_counter(following_data["totalItems"]),
1498 hide_followers: hide_followers
1499 }}
1500 else
1501 {:error, _} = e -> e
1502 e -> {:error, e}
1503 end
1504 end
1505
1506 defp normalize_counter(counter) when is_integer(counter), do: counter
1507 defp normalize_counter(_), do: 0
1508
1509 def maybe_update_follow_information(user_data) do
1510 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1511 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1512 {_, true} <-
1513 {:collections_available,
1514 !!(user_data[:following_address] && user_data[:follower_address])},
1515 {:ok, info} <-
1516 fetch_follow_information_for_user(user_data) do
1517 info = Map.merge(user_data[:info] || %{}, info)
1518
1519 user_data
1520 |> Map.put(:info, info)
1521 else
1522 {:user_type_check, false} ->
1523 user_data
1524
1525 {:collections_available, false} ->
1526 user_data
1527
1528 {:enabled, false} ->
1529 user_data
1530
1531 e ->
1532 Logger.error(
1533 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1534 )
1535
1536 user_data
1537 end
1538 end
1539
1540 defp collection_private(%{"first" => %{"type" => type}})
1541 when type in ["CollectionPage", "OrderedCollectionPage"],
1542 do: {:ok, false}
1543
1544 defp collection_private(%{"first" => first}) do
1545 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1546 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1547 {:ok, false}
1548 else
1549 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1550 {:error, _} = e -> e
1551 e -> {:error, e}
1552 end
1553 end
1554
1555 defp collection_private(_data), do: {:ok, true}
1556
1557 def user_data_from_user_object(data) do
1558 with {:ok, data} <- MRF.filter(data) do
1559 {:ok, object_to_user_data(data)}
1560 else
1561 e -> {:error, e}
1562 end
1563 end
1564
1565 def fetch_and_prepare_user_from_ap_id(ap_id) do
1566 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1567 {:ok, data} <- user_data_from_user_object(data) do
1568 {:ok, maybe_update_follow_information(data)}
1569 else
1570 # If this has been deleted, only log a debug and not an error
1571 {:error, "Object has been deleted" = e} ->
1572 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1573 {:error, e}
1574
1575 {:error, {:reject, reason} = e} ->
1576 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1577 {:error, e}
1578
1579 {:error, e} ->
1580 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1581 {:error, e}
1582 end
1583 end
1584
1585 def maybe_handle_clashing_nickname(data) do
1586 with nickname when is_binary(nickname) <- data[:nickname],
1587 %User{} = old_user <- User.get_by_nickname(nickname),
1588 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1589 Logger.info(
1590 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1591 data[:ap_id]
1592 }, renaming."
1593 )
1594
1595 old_user
1596 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1597 |> User.update_and_set_cache()
1598 else
1599 {:ap_id_comparison, true} ->
1600 Logger.info(
1601 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1602 )
1603
1604 _ ->
1605 nil
1606 end
1607 end
1608
1609 def make_user_from_ap_id(ap_id) do
1610 user = User.get_cached_by_ap_id(ap_id)
1611
1612 if user && !User.ap_enabled?(user) do
1613 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1614 else
1615 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1616 if user do
1617 user
1618 |> User.remote_user_changeset(data)
1619 |> User.update_and_set_cache()
1620 else
1621 maybe_handle_clashing_nickname(data)
1622
1623 data
1624 |> User.remote_user_changeset()
1625 |> Repo.insert()
1626 |> User.set_cache()
1627 end
1628 end
1629 end
1630 end
1631
1632 def make_user_from_nickname(nickname) do
1633 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1634 make_user_from_ap_id(ap_id)
1635 else
1636 _e -> {:error, "No AP id in WebFinger"}
1637 end
1638 end
1639
1640 # filter out broken threads
1641 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1642 entire_thread_visible_for_user?(activity, user)
1643 end
1644
1645 # do post-processing on a specific activity
1646 def contain_activity(%Activity{} = activity, %User{} = user) do
1647 contain_broken_threads(activity, user)
1648 end
1649
1650 def fetch_direct_messages_query do
1651 Activity
1652 |> restrict_type(%{type: "Create"})
1653 |> restrict_visibility(%{visibility: "direct"})
1654 |> order_by([activity], asc: activity.id)
1655 end
1656 end