9623e635a7e32a98354b4189adbf48637bec10f9
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
383 superuser
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
386 end)
387
388 {:ok, activity}
389 else
390 {:error, error} -> Repo.rollback(error)
391 end
392 end
393
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
396 params = %{
397 "type" => "Move",
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id
401 }
402
403 with true <- origin.ap_id in target.also_known_as,
404 {:ok, activity} <- insert(params, local),
405 _ <- notify_and_stream(activity) do
406 maybe_federate(activity)
407
408 BackgroundWorker.enqueue("move_following", %{
409 "origin_id" => origin.id,
410 "target_id" => target.id
411 })
412
413 {:ok, activity}
414 else
415 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
416 err -> err
417 end
418 end
419
420 def fetch_activities_for_context_query(context, opts) do
421 public = [Constants.as_public()]
422
423 recipients =
424 if opts[:user],
425 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
426 else: public
427
428 from(activity in Activity)
429 |> maybe_preload_objects(opts)
430 |> maybe_preload_bookmarks(opts)
431 |> maybe_set_thread_muted_field(opts)
432 |> restrict_blocked(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
435 |> where(
436 [activity],
437 fragment(
438 "?->>'type' = ? and ?->>'context' = ?",
439 activity.data,
440 "Create",
441 activity.data,
442 ^context
443 )
444 )
445 |> exclude_poll_votes(opts)
446 |> exclude_id(opts)
447 |> order_by([activity], desc: activity.id)
448 end
449
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
452 context
453 |> fetch_activities_for_context_query(opts)
454 |> Repo.all()
455 end
456
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 context
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
463 |> limit(1)
464 |> select([a], a.id)
465 |> Repo.one()
466 end
467
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
471
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
476 end
477
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 opts
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
483 end
484
485 @valid_visibilities ~w[direct unlisted public private]
486
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
490 from(
491 a in query,
492 where:
493 fragment(
494 "activity_visibility(?, ?, ?) = ANY (?)",
495 a.actor,
496 a.recipients,
497 a.data,
498 ^visibility
499 )
500 )
501 else
502 Logger.error("Could not restrict visibility to #{visibility}")
503 end
504 end
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
508 from(
509 a in query,
510 where:
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
512 )
513 end
514
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
518 end
519
520 defp restrict_visibility(query, _visibility), do: query
521
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
525 from(
526 a in query,
527 where:
528 not fragment(
529 "activity_visibility(?, ?, ?) = ANY (?)",
530 a.actor,
531 a.recipients,
532 a.data,
533 ^visibility
534 )
535 )
536 else
537 Logger.error("Could not exclude visibility to #{visibility}")
538 query
539 end
540 end
541
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ?",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 end
556
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
560 query
561 end
562
563 defp exclude_visibility(query, _visibility), do: query
564
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
566 do: query
567
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
569 do: query
570
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
572 from(
573 a in query,
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
575 )
576 end
577
578 defp restrict_thread_visibility(query, _, _), do: query
579
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
581 params =
582 params
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
585
586 %{
587 godmode: params[:godmode],
588 reading_user: reading_user
589 }
590 |> user_activities_recipients()
591 |> fetch_activities(params)
592 |> Enum.reverse()
593 end
594
595 def fetch_user_activities(user, reading_user, params \\ %{})
596
597 def fetch_user_activities(user, reading_user, %{total: true} = params) do
598 result = fetch_activities_for_user(user, reading_user, params)
599
600 Keyword.put(result, :items, Enum.reverse(result[:items]))
601 end
602
603 def fetch_user_activities(user, reading_user, params) do
604 user
605 |> fetch_activities_for_user(reading_user, params)
606 |> Enum.reverse()
607 end
608
609 defp fetch_activities_for_user(user, reading_user, params) do
610 params =
611 params
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
616
617 params =
618 if User.blocks?(reading_user, user) do
619 params
620 else
621 params
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
624 end
625
626 pagination_type = Map.get(params, :pagination_type) || :keyset
627
628 %{
629 godmode: params[:godmode],
630 reading_user: reading_user
631 }
632 |> user_activities_recipients()
633 |> fetch_activities(params, pagination_type)
634 end
635
636 def fetch_statuses(reading_user, %{total: true} = params) do
637 result = fetch_activities_for_reading_user(reading_user, params)
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
639 end
640
641 def fetch_statuses(reading_user, params) do
642 reading_user
643 |> fetch_activities_for_reading_user(params)
644 |> Enum.reverse()
645 end
646
647 defp fetch_activities_for_reading_user(reading_user, params) do
648 params = Map.put(params, :type, ["Create", "Announce"])
649
650 %{
651 godmode: params[:godmode],
652 reading_user: reading_user
653 }
654 |> user_activities_recipients()
655 |> fetch_activities(params, :offset)
656 end
657
658 defp user_activities_recipients(%{godmode: true}), do: []
659
660 defp user_activities_recipients(%{reading_user: reading_user}) do
661 if reading_user do
662 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
663 else
664 [Constants.as_public()]
665 end
666 end
667
668 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
669 raise "Can't use the child object without preloading!"
670 end
671
672 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
673 from(
674 [activity, object] in query,
675 where:
676 fragment(
677 "?->>'type' != ? or ?->>'actor' != ?",
678 activity.data,
679 "Announce",
680 object.data,
681 ^actor
682 )
683 )
684 end
685
686 defp restrict_announce_object_actor(query, _), do: query
687
688 defp restrict_since(query, %{since_id: ""}), do: query
689
690 defp restrict_since(query, %{since_id: since_id}) do
691 from(activity in query, where: activity.id > ^since_id)
692 end
693
694 defp restrict_since(query, _), do: query
695
696 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
697 raise_on_missing_preload()
698 end
699
700 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
701 from(
702 [_activity, object] in query,
703 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
704 )
705 end
706
707 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
708 restrict_embedded_tag_any(query, %{tag: tag})
709 end
710
711 defp restrict_embedded_tag_all(query, _), do: query
712
713 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
714 raise_on_missing_preload()
715 end
716
717 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag}) do
718 from(
719 [_activity, object] in query,
720 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
721 )
722 end
723
724 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
725 restrict_embedded_tag_any(query, %{tag: [tag]})
726 end
727
728 defp restrict_embedded_tag_any(query, _), do: query
729
730 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
731 raise_on_missing_preload()
732 end
733
734 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
735 from(
736 [_activity, object] in query,
737 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
738 )
739 end
740
741 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
742 when is_binary(tag_reject) do
743 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
744 end
745
746 defp restrict_embedded_tag_reject_any(query, _), do: query
747
748 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
749 raise_on_missing_preload()
750 end
751
752 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
753 from(
754 [_activity, object] in query,
755 where:
756 fragment(
757 """
758 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
759 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
760 AND hashtags_objects.object_id = ?) @> ?
761 """,
762 ^tags,
763 object.id,
764 ^tags
765 )
766 )
767 end
768
769 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
770 restrict_hashtag_any(query, %{tag: tag})
771 end
772
773 defp restrict_hashtag_all(query, _), do: query
774
775 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
776 raise_on_missing_preload()
777 end
778
779 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
780 from(
781 [_activity, object] in query,
782 where:
783 fragment(
784 """
785 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
786 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
787 AND hashtags_objects.object_id = ? LIMIT 1)
788 """,
789 ^tags,
790 object.id
791 )
792 )
793 end
794
795 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
796 restrict_hashtag_any(query, %{tag: [tag]})
797 end
798
799 defp restrict_hashtag_any(query, _), do: query
800
801 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
802 raise_on_missing_preload()
803 end
804
805 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
806 from(
807 [_activity, object] in query,
808 where:
809 fragment(
810 """
811 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
812 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
813 AND hashtags_objects.object_id = ? LIMIT 1)
814 """,
815 ^tags_reject,
816 object.id
817 )
818 )
819 end
820
821 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
822 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
823 end
824
825 defp restrict_hashtag_reject_any(query, _), do: query
826
827 defp raise_on_missing_preload do
828 raise "Can't use the child object without preloading!"
829 end
830
831 defp restrict_recipients(query, [], _user), do: query
832
833 defp restrict_recipients(query, recipients, nil) do
834 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
835 end
836
837 defp restrict_recipients(query, recipients, user) do
838 from(
839 activity in query,
840 where: fragment("? && ?", ^recipients, activity.recipients),
841 or_where: activity.actor == ^user.ap_id
842 )
843 end
844
845 defp restrict_local(query, %{local_only: true}) do
846 from(activity in query, where: activity.local == true)
847 end
848
849 defp restrict_local(query, _), do: query
850
851 defp restrict_remote(query, %{remote: true}) do
852 from(activity in query, where: activity.local == false)
853 end
854
855 defp restrict_remote(query, _), do: query
856
857 defp restrict_actor(query, %{actor_id: actor_id}) do
858 from(activity in query, where: activity.actor == ^actor_id)
859 end
860
861 defp restrict_actor(query, _), do: query
862
863 defp restrict_type(query, %{type: type}) when is_binary(type) do
864 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
865 end
866
867 defp restrict_type(query, %{type: type}) do
868 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
869 end
870
871 defp restrict_type(query, _), do: query
872
873 defp restrict_state(query, %{state: state}) do
874 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
875 end
876
877 defp restrict_state(query, _), do: query
878
879 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
880 from(
881 [_activity, object] in query,
882 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
883 )
884 end
885
886 defp restrict_favorited_by(query, _), do: query
887
888 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
889 raise "Can't use the child object without preloading!"
890 end
891
892 defp restrict_media(query, %{only_media: true}) do
893 from(
894 [activity, object] in query,
895 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
896 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
897 )
898 end
899
900 defp restrict_media(query, _), do: query
901
902 defp restrict_replies(query, %{exclude_replies: true}) do
903 from(
904 [_activity, object] in query,
905 where: fragment("?->>'inReplyTo' is null", object.data)
906 )
907 end
908
909 defp restrict_replies(query, %{
910 reply_filtering_user: %User{} = user,
911 reply_visibility: "self"
912 }) do
913 from(
914 [activity, object] in query,
915 where:
916 fragment(
917 "?->>'inReplyTo' is null OR ? = ANY(?)",
918 object.data,
919 ^user.ap_id,
920 activity.recipients
921 )
922 )
923 end
924
925 defp restrict_replies(query, %{
926 reply_filtering_user: %User{} = user,
927 reply_visibility: "following"
928 }) do
929 from(
930 [activity, object] in query,
931 where:
932 fragment(
933 """
934 ?->>'type' != 'Create' -- This isn't a Create
935 OR ?->>'inReplyTo' is null -- this isn't a reply
936 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
937 -- unless they are the author (because authors
938 -- are also part of the recipients). This leads
939 -- to a bug that self-replies by friends won't
940 -- show up.
941 OR ? = ? -- The actor is us
942 """,
943 activity.data,
944 object.data,
945 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
946 activity.recipients,
947 activity.actor,
948 activity.actor,
949 ^user.ap_id
950 )
951 )
952 end
953
954 defp restrict_replies(query, _), do: query
955
956 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
957 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
958 end
959
960 defp restrict_reblogs(query, _), do: query
961
962 defp restrict_muted(query, %{with_muted: true}), do: query
963
964 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
965 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
966
967 query =
968 from([activity] in query,
969 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
970 where:
971 fragment(
972 "not (?->'to' \\?| ?) or ? = ?",
973 activity.data,
974 ^mutes,
975 activity.actor,
976 ^user.ap_id
977 )
978 )
979
980 unless opts[:skip_preload] do
981 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
982 else
983 query
984 end
985 end
986
987 defp restrict_muted(query, _), do: query
988
989 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
990 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
991 domain_blocks = user.domain_blocks || []
992
993 following_ap_ids = User.get_friends_ap_ids(user)
994
995 query =
996 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
997
998 from(
999 [activity, object: o] in query,
1000 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1001 where:
1002 fragment(
1003 "((not (? && ?)) or ? = ?)",
1004 activity.recipients,
1005 ^blocked_ap_ids,
1006 activity.actor,
1007 ^user.ap_id
1008 ),
1009 where:
1010 fragment(
1011 "recipients_contain_blocked_domains(?, ?) = false",
1012 activity.recipients,
1013 ^domain_blocks
1014 ),
1015 where:
1016 fragment(
1017 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1018 activity.data,
1019 activity.data,
1020 ^blocked_ap_ids
1021 ),
1022 where:
1023 fragment(
1024 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1025 activity.actor,
1026 ^domain_blocks,
1027 activity.actor,
1028 ^following_ap_ids
1029 ),
1030 where:
1031 fragment(
1032 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1033 o.data,
1034 ^domain_blocks,
1035 o.data,
1036 ^following_ap_ids
1037 )
1038 )
1039 end
1040
1041 defp restrict_blocked(query, _), do: query
1042
1043 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1044 from(
1045 activity in query,
1046 where:
1047 fragment(
1048 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1049 activity.data,
1050 ^[Constants.as_public()]
1051 )
1052 )
1053 end
1054
1055 defp restrict_unlisted(query, _), do: query
1056
1057 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1058 from(activity in query, where: activity.id in ^ids)
1059 end
1060
1061 defp restrict_pinned(query, _), do: query
1062
1063 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1064 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1065
1066 from(
1067 activity in query,
1068 where:
1069 fragment(
1070 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1071 activity.data,
1072 activity.actor,
1073 ^muted_reblogs
1074 )
1075 )
1076 end
1077
1078 defp restrict_muted_reblogs(query, _), do: query
1079
1080 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1081 from(
1082 activity in query,
1083 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1084 )
1085 end
1086
1087 defp restrict_instance(query, _), do: query
1088
1089 defp restrict_filtered(query, %{user: %User{} = user}) do
1090 case Filter.compose_regex(user) do
1091 nil ->
1092 query
1093
1094 regex ->
1095 from([activity, object] in query,
1096 where:
1097 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1098 activity.actor == ^user.ap_id
1099 )
1100 end
1101 end
1102
1103 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1104 restrict_filtered(query, %{user: user})
1105 end
1106
1107 defp restrict_filtered(query, _), do: query
1108
1109 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1110
1111 defp exclude_poll_votes(query, _) do
1112 if has_named_binding?(query, :object) do
1113 from([activity, object: o] in query,
1114 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1115 )
1116 else
1117 query
1118 end
1119 end
1120
1121 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1122
1123 defp exclude_chat_messages(query, _) do
1124 if has_named_binding?(query, :object) do
1125 from([activity, object: o] in query,
1126 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1127 )
1128 else
1129 query
1130 end
1131 end
1132
1133 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1134
1135 defp exclude_invisible_actors(query, _opts) do
1136 invisible_ap_ids =
1137 User.Query.build(%{invisible: true, select: [:ap_id]})
1138 |> Repo.all()
1139 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1140
1141 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1142 end
1143
1144 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1145 from(activity in query, where: activity.id != ^id)
1146 end
1147
1148 defp exclude_id(query, _), do: query
1149
1150 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1151
1152 defp maybe_preload_objects(query, _) do
1153 query
1154 |> Activity.with_preloaded_object()
1155 end
1156
1157 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1158
1159 defp maybe_preload_bookmarks(query, opts) do
1160 query
1161 |> Activity.with_preloaded_bookmark(opts[:user])
1162 end
1163
1164 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1165 query
1166 |> Activity.with_preloaded_report_notes()
1167 end
1168
1169 defp maybe_preload_report_notes(query, _), do: query
1170
1171 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1172
1173 defp maybe_set_thread_muted_field(query, opts) do
1174 query
1175 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1176 end
1177
1178 defp maybe_order(query, %{order: :desc}) do
1179 query
1180 |> order_by(desc: :id)
1181 end
1182
1183 defp maybe_order(query, %{order: :asc}) do
1184 query
1185 |> order_by(asc: :id)
1186 end
1187
1188 defp maybe_order(query, _), do: query
1189
1190 defp fetch_activities_query_ap_ids_ops(opts) do
1191 source_user = opts[:muting_user]
1192 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1193
1194 ap_id_relationships =
1195 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1196 [:block | ap_id_relationships]
1197 else
1198 ap_id_relationships
1199 end
1200
1201 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1202
1203 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1204 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1205
1206 restrict_muted_reblogs_opts =
1207 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1208
1209 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1210 end
1211
1212 def fetch_activities_query(recipients, opts \\ %{}) do
1213 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1214 fetch_activities_query_ap_ids_ops(opts)
1215
1216 config = %{
1217 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1218 }
1219
1220 query =
1221 Activity
1222 |> maybe_preload_objects(opts)
1223 |> maybe_preload_bookmarks(opts)
1224 |> maybe_preload_report_notes(opts)
1225 |> maybe_set_thread_muted_field(opts)
1226 |> maybe_order(opts)
1227 |> restrict_recipients(recipients, opts[:user])
1228 |> restrict_replies(opts)
1229 |> restrict_since(opts)
1230 |> restrict_local(opts)
1231 |> restrict_remote(opts)
1232 |> restrict_actor(opts)
1233 |> restrict_type(opts)
1234 |> restrict_state(opts)
1235 |> restrict_favorited_by(opts)
1236 |> restrict_blocked(restrict_blocked_opts)
1237 |> restrict_muted(restrict_muted_opts)
1238 |> restrict_filtered(opts)
1239 |> restrict_media(opts)
1240 |> restrict_visibility(opts)
1241 |> restrict_thread_visibility(opts, config)
1242 |> restrict_reblogs(opts)
1243 |> restrict_pinned(opts)
1244 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1245 |> restrict_instance(opts)
1246 |> restrict_announce_object_actor(opts)
1247 |> restrict_filtered(opts)
1248 |> Activity.restrict_deactivated_users()
1249 |> exclude_poll_votes(opts)
1250 |> exclude_chat_messages(opts)
1251 |> exclude_invisible_actors(opts)
1252 |> exclude_visibility(opts)
1253
1254 if Config.get([:database, :improved_hashtag_timeline]) do
1255 query
1256 |> restrict_hashtag_any(opts)
1257 |> restrict_hashtag_all(opts)
1258 |> restrict_hashtag_reject_any(opts)
1259 else
1260 query
1261 |> restrict_embedded_tag_any(opts)
1262 |> restrict_embedded_tag_all(opts)
1263 |> restrict_embedded_tag_reject_any(opts)
1264 end
1265 end
1266
1267 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1268 list_memberships = Pleroma.List.memberships(opts[:user])
1269
1270 fetch_activities_query(recipients ++ list_memberships, opts)
1271 |> Pagination.fetch_paginated(opts, pagination)
1272 |> Enum.reverse()
1273 |> maybe_update_cc(list_memberships, opts[:user])
1274 end
1275
1276 @doc """
1277 Fetch favorites activities of user with order by sort adds to favorites
1278 """
1279 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1280 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1281 user.ap_id
1282 |> Activity.Queries.by_actor()
1283 |> Activity.Queries.by_type("Like")
1284 |> Activity.with_joined_object()
1285 |> Object.with_joined_activity()
1286 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1287 |> order_by([like, _, _], desc_nulls_last: like.id)
1288 |> Pagination.fetch_paginated(
1289 Map.merge(params, %{skip_order: true}),
1290 pagination
1291 )
1292 end
1293
1294 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1295 Enum.map(activities, fn
1296 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1297 if Enum.any?(bcc, &(&1 in list_memberships)) do
1298 update_in(activity.data["cc"], &[user_ap_id | &1])
1299 else
1300 activity
1301 end
1302
1303 activity ->
1304 activity
1305 end)
1306 end
1307
1308 defp maybe_update_cc(activities, _, _), do: activities
1309
1310 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1311 from(activity in query,
1312 where:
1313 fragment("? && ?", activity.recipients, ^recipients) or
1314 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1315 ^Constants.as_public() in activity.recipients)
1316 )
1317 end
1318
1319 def fetch_activities_bounded(
1320 recipients,
1321 recipients_with_public,
1322 opts \\ %{},
1323 pagination \\ :keyset
1324 ) do
1325 fetch_activities_query([], opts)
1326 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1327 |> Pagination.fetch_paginated(opts, pagination)
1328 |> Enum.reverse()
1329 end
1330
1331 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1332 def upload(file, opts \\ []) do
1333 with {:ok, data} <- Upload.store(file, opts) do
1334 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1335
1336 Repo.insert(%Object{data: obj_data})
1337 end
1338 end
1339
1340 @spec get_actor_url(any()) :: binary() | nil
1341 defp get_actor_url(url) when is_binary(url), do: url
1342 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1343
1344 defp get_actor_url(url) when is_list(url) do
1345 url
1346 |> List.first()
1347 |> get_actor_url()
1348 end
1349
1350 defp get_actor_url(_url), do: nil
1351
1352 defp object_to_user_data(data) do
1353 avatar =
1354 data["icon"]["url"] &&
1355 %{
1356 "type" => "Image",
1357 "url" => [%{"href" => data["icon"]["url"]}]
1358 }
1359
1360 banner =
1361 data["image"]["url"] &&
1362 %{
1363 "type" => "Image",
1364 "url" => [%{"href" => data["image"]["url"]}]
1365 }
1366
1367 fields =
1368 data
1369 |> Map.get("attachment", [])
1370 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1371 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1372
1373 emojis =
1374 data
1375 |> Map.get("tag", [])
1376 |> Enum.filter(fn
1377 %{"type" => "Emoji"} -> true
1378 _ -> false
1379 end)
1380 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1381 {String.trim(name, ":"), url}
1382 end)
1383
1384 is_locked = data["manuallyApprovesFollowers"] || false
1385 capabilities = data["capabilities"] || %{}
1386 accepts_chat_messages = capabilities["acceptsChatMessages"]
1387 data = Transmogrifier.maybe_fix_user_object(data)
1388 is_discoverable = data["discoverable"] || false
1389 invisible = data["invisible"] || false
1390 actor_type = data["type"] || "Person"
1391
1392 public_key =
1393 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1394 data["publicKey"]["publicKeyPem"]
1395 else
1396 nil
1397 end
1398
1399 shared_inbox =
1400 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1401 data["endpoints"]["sharedInbox"]
1402 else
1403 nil
1404 end
1405
1406 user_data = %{
1407 ap_id: data["id"],
1408 uri: get_actor_url(data["url"]),
1409 ap_enabled: true,
1410 banner: banner,
1411 fields: fields,
1412 emoji: emojis,
1413 is_locked: is_locked,
1414 is_discoverable: is_discoverable,
1415 invisible: invisible,
1416 avatar: avatar,
1417 name: data["name"],
1418 follower_address: data["followers"],
1419 following_address: data["following"],
1420 bio: data["summary"] || "",
1421 actor_type: actor_type,
1422 also_known_as: Map.get(data, "alsoKnownAs", []),
1423 public_key: public_key,
1424 inbox: data["inbox"],
1425 shared_inbox: shared_inbox,
1426 accepts_chat_messages: accepts_chat_messages
1427 }
1428
1429 # nickname can be nil because of virtual actors
1430 if data["preferredUsername"] do
1431 Map.put(
1432 user_data,
1433 :nickname,
1434 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1435 )
1436 else
1437 Map.put(user_data, :nickname, nil)
1438 end
1439 end
1440
1441 def fetch_follow_information_for_user(user) do
1442 with {:ok, following_data} <-
1443 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1444 {:ok, hide_follows} <- collection_private(following_data),
1445 {:ok, followers_data} <-
1446 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1447 {:ok, hide_followers} <- collection_private(followers_data) do
1448 {:ok,
1449 %{
1450 hide_follows: hide_follows,
1451 follower_count: normalize_counter(followers_data["totalItems"]),
1452 following_count: normalize_counter(following_data["totalItems"]),
1453 hide_followers: hide_followers
1454 }}
1455 else
1456 {:error, _} = e -> e
1457 e -> {:error, e}
1458 end
1459 end
1460
1461 defp normalize_counter(counter) when is_integer(counter), do: counter
1462 defp normalize_counter(_), do: 0
1463
1464 def maybe_update_follow_information(user_data) do
1465 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1466 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1467 {_, true} <-
1468 {:collections_available,
1469 !!(user_data[:following_address] && user_data[:follower_address])},
1470 {:ok, info} <-
1471 fetch_follow_information_for_user(user_data) do
1472 info = Map.merge(user_data[:info] || %{}, info)
1473
1474 user_data
1475 |> Map.put(:info, info)
1476 else
1477 {:user_type_check, false} ->
1478 user_data
1479
1480 {:collections_available, false} ->
1481 user_data
1482
1483 {:enabled, false} ->
1484 user_data
1485
1486 e ->
1487 Logger.error(
1488 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1489 )
1490
1491 user_data
1492 end
1493 end
1494
1495 defp collection_private(%{"first" => %{"type" => type}})
1496 when type in ["CollectionPage", "OrderedCollectionPage"],
1497 do: {:ok, false}
1498
1499 defp collection_private(%{"first" => first}) do
1500 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1501 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1502 {:ok, false}
1503 else
1504 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1505 {:error, _} = e -> e
1506 e -> {:error, e}
1507 end
1508 end
1509
1510 defp collection_private(_data), do: {:ok, true}
1511
1512 def user_data_from_user_object(data) do
1513 with {:ok, data} <- MRF.filter(data) do
1514 {:ok, object_to_user_data(data)}
1515 else
1516 e -> {:error, e}
1517 end
1518 end
1519
1520 def fetch_and_prepare_user_from_ap_id(ap_id) do
1521 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1522 {:ok, data} <- user_data_from_user_object(data) do
1523 {:ok, maybe_update_follow_information(data)}
1524 else
1525 # If this has been deleted, only log a debug and not an error
1526 {:error, "Object has been deleted" = e} ->
1527 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1528 {:error, e}
1529
1530 {:error, {:reject, reason} = e} ->
1531 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1532 {:error, e}
1533
1534 {:error, e} ->
1535 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1536 {:error, e}
1537 end
1538 end
1539
1540 def maybe_handle_clashing_nickname(data) do
1541 with nickname when is_binary(nickname) <- data[:nickname],
1542 %User{} = old_user <- User.get_by_nickname(nickname),
1543 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1544 Logger.info(
1545 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1546 data[:ap_id]
1547 }, renaming."
1548 )
1549
1550 old_user
1551 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1552 |> User.update_and_set_cache()
1553 else
1554 {:ap_id_comparison, true} ->
1555 Logger.info(
1556 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1557 )
1558
1559 _ ->
1560 nil
1561 end
1562 end
1563
1564 def make_user_from_ap_id(ap_id) do
1565 user = User.get_cached_by_ap_id(ap_id)
1566
1567 if user && !User.ap_enabled?(user) do
1568 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1569 else
1570 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1571 if user do
1572 user
1573 |> User.remote_user_changeset(data)
1574 |> User.update_and_set_cache()
1575 else
1576 maybe_handle_clashing_nickname(data)
1577
1578 data
1579 |> User.remote_user_changeset()
1580 |> Repo.insert()
1581 |> User.set_cache()
1582 end
1583 end
1584 end
1585 end
1586
1587 def make_user_from_nickname(nickname) do
1588 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1589 make_user_from_ap_id(ap_id)
1590 else
1591 _e -> {:error, "No AP id in WebFinger"}
1592 end
1593 end
1594
1595 # filter out broken threads
1596 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1597 entire_thread_visible_for_user?(activity, user)
1598 end
1599
1600 # do post-processing on a specific activity
1601 def contain_activity(%Activity{} = activity, %User{} = user) do
1602 contain_broken_threads(activity, user)
1603 end
1604
1605 def fetch_direct_messages_query do
1606 Activity
1607 |> restrict_type(%{type: "Create"})
1608 |> restrict_visibility(%{visibility: "direct"})
1609 |> order_by([activity], asc: activity.id)
1610 end
1611 end