5392ce7c9eaba8836c35b5f3554a39d4d9c59003
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> user.ap_id != actor end)
381 |> Enum.filter(fn user -> not is_nil(user.email) end)
382 |> Enum.each(fn superuser ->
383 superuser
384 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
385 |> Pleroma.Emails.Mailer.deliver_async()
386 end)
387
388 {:ok, activity}
389 else
390 {:error, error} -> Repo.rollback(error)
391 end
392 end
393
394 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
395 def move(%User{} = origin, %User{} = target, local \\ true) do
396 params = %{
397 "type" => "Move",
398 "actor" => origin.ap_id,
399 "object" => origin.ap_id,
400 "target" => target.ap_id
401 }
402
403 with true <- origin.ap_id in target.also_known_as,
404 {:ok, activity} <- insert(params, local),
405 _ <- notify_and_stream(activity) do
406 maybe_federate(activity)
407
408 BackgroundWorker.enqueue("move_following", %{
409 "origin_id" => origin.id,
410 "target_id" => target.id
411 })
412
413 {:ok, activity}
414 else
415 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
416 err -> err
417 end
418 end
419
420 def fetch_activities_for_context_query(context, opts) do
421 public = [Constants.as_public()]
422
423 recipients =
424 if opts[:user],
425 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
426 else: public
427
428 from(activity in Activity)
429 |> maybe_preload_objects(opts)
430 |> maybe_preload_bookmarks(opts)
431 |> maybe_set_thread_muted_field(opts)
432 |> restrict_blocked(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
435 |> where(
436 [activity],
437 fragment(
438 "?->>'type' = ? and ?->>'context' = ?",
439 activity.data,
440 "Create",
441 activity.data,
442 ^context
443 )
444 )
445 |> exclude_poll_votes(opts)
446 |> exclude_id(opts)
447 |> order_by([activity], desc: activity.id)
448 end
449
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
452 context
453 |> fetch_activities_for_context_query(opts)
454 |> Repo.all()
455 end
456
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 context
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
463 |> limit(1)
464 |> select([a], a.id)
465 |> Repo.one()
466 end
467
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
471
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
476 end
477
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 opts
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
483 end
484
485 @valid_visibilities ~w[direct unlisted public private]
486
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
490 from(
491 a in query,
492 where:
493 fragment(
494 "activity_visibility(?, ?, ?) = ANY (?)",
495 a.actor,
496 a.recipients,
497 a.data,
498 ^visibility
499 )
500 )
501 else
502 Logger.error("Could not restrict visibility to #{visibility}")
503 end
504 end
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
508 from(
509 a in query,
510 where:
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
512 )
513 end
514
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
518 end
519
520 defp restrict_visibility(query, _visibility), do: query
521
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
525 from(
526 a in query,
527 where:
528 not fragment(
529 "activity_visibility(?, ?, ?) = ANY (?)",
530 a.actor,
531 a.recipients,
532 a.data,
533 ^visibility
534 )
535 )
536 else
537 Logger.error("Could not exclude visibility to #{visibility}")
538 query
539 end
540 end
541
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ?",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 end
556
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
560 query
561 end
562
563 defp exclude_visibility(query, _visibility), do: query
564
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
566 do: query
567
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
569 do: query
570
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
572 from(
573 a in query,
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
575 )
576 end
577
578 defp restrict_thread_visibility(query, _, _), do: query
579
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
581 params =
582 params
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
585
586 %{
587 godmode: params[:godmode],
588 reading_user: reading_user
589 }
590 |> user_activities_recipients()
591 |> fetch_activities(params)
592 |> Enum.reverse()
593 end
594
595 def fetch_user_activities(user, reading_user, params \\ %{})
596
597 def fetch_user_activities(user, reading_user, %{total: true} = params) do
598 result = fetch_activities_for_user(user, reading_user, params)
599
600 Keyword.put(result, :items, Enum.reverse(result[:items]))
601 end
602
603 def fetch_user_activities(user, reading_user, params) do
604 user
605 |> fetch_activities_for_user(reading_user, params)
606 |> Enum.reverse()
607 end
608
609 defp fetch_activities_for_user(user, reading_user, params) do
610 params =
611 params
612 |> Map.put(:type, ["Create", "Announce"])
613 |> Map.put(:user, reading_user)
614 |> Map.put(:actor_id, user.ap_id)
615 |> Map.put(:pinned_activity_ids, user.pinned_activities)
616
617 params =
618 if User.blocks?(reading_user, user) do
619 params
620 else
621 params
622 |> Map.put(:blocking_user, reading_user)
623 |> Map.put(:muting_user, reading_user)
624 end
625
626 pagination_type = Map.get(params, :pagination_type) || :keyset
627
628 %{
629 godmode: params[:godmode],
630 reading_user: reading_user
631 }
632 |> user_activities_recipients()
633 |> fetch_activities(params, pagination_type)
634 end
635
636 def fetch_statuses(reading_user, %{total: true} = params) do
637 result = fetch_activities_for_reading_user(reading_user, params)
638 Keyword.put(result, :items, Enum.reverse(result[:items]))
639 end
640
641 def fetch_statuses(reading_user, params) do
642 reading_user
643 |> fetch_activities_for_reading_user(params)
644 |> Enum.reverse()
645 end
646
647 defp fetch_activities_for_reading_user(reading_user, params) do
648 params = Map.put(params, :type, ["Create", "Announce"])
649
650 %{
651 godmode: params[:godmode],
652 reading_user: reading_user
653 }
654 |> user_activities_recipients()
655 |> fetch_activities(params, :offset)
656 end
657
658 defp user_activities_recipients(%{godmode: true}), do: []
659
660 defp user_activities_recipients(%{reading_user: reading_user}) do
661 if reading_user do
662 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
663 else
664 [Constants.as_public()]
665 end
666 end
667
668 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
669 raise "Can't use the child object without preloading!"
670 end
671
672 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
673 from(
674 [activity, object] in query,
675 where:
676 fragment(
677 "?->>'type' != ? or ?->>'actor' != ?",
678 activity.data,
679 "Announce",
680 object.data,
681 ^actor
682 )
683 )
684 end
685
686 defp restrict_announce_object_actor(query, _), do: query
687
688 defp restrict_since(query, %{since_id: ""}), do: query
689
690 defp restrict_since(query, %{since_id: since_id}) do
691 from(activity in query, where: activity.id > ^since_id)
692 end
693
694 defp restrict_since(query, _), do: query
695
696 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
697 raise_on_missing_preload()
698 end
699
700 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
701 tag_all = Enum.map(tag_all, &String.downcase/1)
702
703 from(
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
706 )
707 end
708
709 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
710 restrict_embedded_tag_any(query, %{tag: tag})
711 end
712
713 defp restrict_embedded_tag_all(query, _), do: query
714
715 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
716 raise_on_missing_preload()
717 end
718
719 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
720 tag_any = Enum.map(tag_any, &String.downcase/1)
721
722 from(
723 [_activity, object] in query,
724 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
725 )
726 end
727
728 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
729 restrict_embedded_tag_any(query, %{tag: [tag]})
730 end
731
732 defp restrict_embedded_tag_any(query, _), do: query
733
734 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
735 raise_on_missing_preload()
736 end
737
738 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
739 tag_reject = Enum.map(tag_reject, &String.downcase/1)
740
741 from(
742 [_activity, object] in query,
743 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
744 )
745 end
746
747 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
748 when is_binary(tag_reject) do
749 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
750 end
751
752 defp restrict_embedded_tag_reject_any(query, _), do: query
753
754 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
755 raise_on_missing_preload()
756 end
757
758 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
759 restrict_hashtag_any(query, %{tag: single_tag})
760 end
761
762 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
763 from(
764 [_activity, object] in query,
765 where:
766 fragment(
767 """
768 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
769 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
770 AND hashtags_objects.object_id = ?) @> ?
771 """,
772 ^tags,
773 object.id,
774 ^tags
775 )
776 )
777 end
778
779 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
780 restrict_hashtag_all(query, %{tag_all: [tag]})
781 end
782
783 defp restrict_hashtag_all(query, _), do: query
784
785 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
786 raise_on_missing_preload()
787 end
788
789 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
790 # TODO: refactor: debug / experimental feature
791 if Config.get([:database, :improved_hashtag_timeline]) == :preselect_hashtag_ids do
792 hashtag_ids =
793 from(ht in Pleroma.Hashtag,
794 where: fragment("name = ANY(?::citext[])", ^tags),
795 select: ht.id
796 )
797 |> Repo.all()
798
799 from(
800 [_activity, object] in query,
801 where:
802 fragment(
803 """
804 EXISTS (
805 SELECT 1 FROM hashtags_objects WHERE hashtag_id = ANY(?) AND object_id = ? LIMIT 1)
806 """,
807 ^hashtag_ids,
808 object.id
809 )
810 )
811 else
812 from(
813 [_activity, object] in query,
814 where:
815 fragment(
816 """
817 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
818 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
819 AND hashtags_objects.object_id = ? LIMIT 1)
820 """,
821 ^tags,
822 object.id
823 )
824 )
825 end
826 end
827
828 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
829 restrict_hashtag_any(query, %{tag: [tag]})
830 end
831
832 defp restrict_hashtag_any(query, _), do: query
833
834 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
835 raise_on_missing_preload()
836 end
837
838 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
839 from(
840 [_activity, object] in query,
841 where:
842 fragment(
843 """
844 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
845 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
846 AND hashtags_objects.object_id = ? LIMIT 1)
847 """,
848 ^tags_reject,
849 object.id
850 )
851 )
852 end
853
854 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
855 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
856 end
857
858 defp restrict_hashtag_reject_any(query, _), do: query
859
860 defp raise_on_missing_preload do
861 raise "Can't use the child object without preloading!"
862 end
863
864 defp restrict_recipients(query, [], _user), do: query
865
866 defp restrict_recipients(query, recipients, nil) do
867 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
868 end
869
870 defp restrict_recipients(query, recipients, user) do
871 from(
872 activity in query,
873 where: fragment("? && ?", ^recipients, activity.recipients),
874 or_where: activity.actor == ^user.ap_id
875 )
876 end
877
878 defp restrict_local(query, %{local_only: true}) do
879 from(activity in query, where: activity.local == true)
880 end
881
882 defp restrict_local(query, _), do: query
883
884 defp restrict_remote(query, %{remote: true}) do
885 from(activity in query, where: activity.local == false)
886 end
887
888 defp restrict_remote(query, _), do: query
889
890 defp restrict_actor(query, %{actor_id: actor_id}) do
891 from(activity in query, where: activity.actor == ^actor_id)
892 end
893
894 defp restrict_actor(query, _), do: query
895
896 defp restrict_type(query, %{type: type}) when is_binary(type) do
897 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
898 end
899
900 defp restrict_type(query, %{type: type}) do
901 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
902 end
903
904 defp restrict_type(query, _), do: query
905
906 defp restrict_state(query, %{state: state}) do
907 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
908 end
909
910 defp restrict_state(query, _), do: query
911
912 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
913 from(
914 [_activity, object] in query,
915 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
916 )
917 end
918
919 defp restrict_favorited_by(query, _), do: query
920
921 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
922 raise "Can't use the child object without preloading!"
923 end
924
925 defp restrict_media(query, %{only_media: true}) do
926 from(
927 [activity, object] in query,
928 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
929 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
930 )
931 end
932
933 defp restrict_media(query, _), do: query
934
935 defp restrict_replies(query, %{exclude_replies: true}) do
936 from(
937 [_activity, object] in query,
938 where: fragment("?->>'inReplyTo' is null", object.data)
939 )
940 end
941
942 defp restrict_replies(query, %{
943 reply_filtering_user: %User{} = user,
944 reply_visibility: "self"
945 }) do
946 from(
947 [activity, object] in query,
948 where:
949 fragment(
950 "?->>'inReplyTo' is null OR ? = ANY(?)",
951 object.data,
952 ^user.ap_id,
953 activity.recipients
954 )
955 )
956 end
957
958 defp restrict_replies(query, %{
959 reply_filtering_user: %User{} = user,
960 reply_visibility: "following"
961 }) do
962 from(
963 [activity, object] in query,
964 where:
965 fragment(
966 """
967 ?->>'type' != 'Create' -- This isn't a Create
968 OR ?->>'inReplyTo' is null -- this isn't a reply
969 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
970 -- unless they are the author (because authors
971 -- are also part of the recipients). This leads
972 -- to a bug that self-replies by friends won't
973 -- show up.
974 OR ? = ? -- The actor is us
975 """,
976 activity.data,
977 object.data,
978 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
979 activity.recipients,
980 activity.actor,
981 activity.actor,
982 ^user.ap_id
983 )
984 )
985 end
986
987 defp restrict_replies(query, _), do: query
988
989 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
990 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
991 end
992
993 defp restrict_reblogs(query, _), do: query
994
995 defp restrict_muted(query, %{with_muted: true}), do: query
996
997 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
998 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
999
1000 query =
1001 from([activity] in query,
1002 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1003 where:
1004 fragment(
1005 "not (?->'to' \\?| ?) or ? = ?",
1006 activity.data,
1007 ^mutes,
1008 activity.actor,
1009 ^user.ap_id
1010 )
1011 )
1012
1013 unless opts[:skip_preload] do
1014 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1015 else
1016 query
1017 end
1018 end
1019
1020 defp restrict_muted(query, _), do: query
1021
1022 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1023 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1024 domain_blocks = user.domain_blocks || []
1025
1026 following_ap_ids = User.get_friends_ap_ids(user)
1027
1028 query =
1029 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1030
1031 from(
1032 [activity, object: o] in query,
1033 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1034 where:
1035 fragment(
1036 "((not (? && ?)) or ? = ?)",
1037 activity.recipients,
1038 ^blocked_ap_ids,
1039 activity.actor,
1040 ^user.ap_id
1041 ),
1042 where:
1043 fragment(
1044 "recipients_contain_blocked_domains(?, ?) = false",
1045 activity.recipients,
1046 ^domain_blocks
1047 ),
1048 where:
1049 fragment(
1050 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1051 activity.data,
1052 activity.data,
1053 ^blocked_ap_ids
1054 ),
1055 where:
1056 fragment(
1057 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1058 activity.actor,
1059 ^domain_blocks,
1060 activity.actor,
1061 ^following_ap_ids
1062 ),
1063 where:
1064 fragment(
1065 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1066 o.data,
1067 ^domain_blocks,
1068 o.data,
1069 ^following_ap_ids
1070 )
1071 )
1072 end
1073
1074 defp restrict_blocked(query, _), do: query
1075
1076 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1077 from(
1078 activity in query,
1079 where:
1080 fragment(
1081 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1082 activity.data,
1083 ^[Constants.as_public()]
1084 )
1085 )
1086 end
1087
1088 defp restrict_unlisted(query, _), do: query
1089
1090 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1091 from(activity in query, where: activity.id in ^ids)
1092 end
1093
1094 defp restrict_pinned(query, _), do: query
1095
1096 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1097 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1098
1099 from(
1100 activity in query,
1101 where:
1102 fragment(
1103 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1104 activity.data,
1105 activity.actor,
1106 ^muted_reblogs
1107 )
1108 )
1109 end
1110
1111 defp restrict_muted_reblogs(query, _), do: query
1112
1113 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1114 from(
1115 activity in query,
1116 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1117 )
1118 end
1119
1120 defp restrict_instance(query, _), do: query
1121
1122 defp restrict_filtered(query, %{user: %User{} = user}) do
1123 case Filter.compose_regex(user) do
1124 nil ->
1125 query
1126
1127 regex ->
1128 from([activity, object] in query,
1129 where:
1130 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1131 activity.actor == ^user.ap_id
1132 )
1133 end
1134 end
1135
1136 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1137 restrict_filtered(query, %{user: user})
1138 end
1139
1140 defp restrict_filtered(query, _), do: query
1141
1142 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1143
1144 defp exclude_poll_votes(query, _) do
1145 if has_named_binding?(query, :object) do
1146 from([activity, object: o] in query,
1147 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1148 )
1149 else
1150 query
1151 end
1152 end
1153
1154 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1155
1156 defp exclude_chat_messages(query, _) do
1157 if has_named_binding?(query, :object) do
1158 from([activity, object: o] in query,
1159 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1160 )
1161 else
1162 query
1163 end
1164 end
1165
1166 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1167
1168 defp exclude_invisible_actors(query, _opts) do
1169 invisible_ap_ids =
1170 User.Query.build(%{invisible: true, select: [:ap_id]})
1171 |> Repo.all()
1172 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1173
1174 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1175 end
1176
1177 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1178 from(activity in query, where: activity.id != ^id)
1179 end
1180
1181 defp exclude_id(query, _), do: query
1182
1183 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1184
1185 defp maybe_preload_objects(query, _) do
1186 query
1187 |> Activity.with_preloaded_object()
1188 end
1189
1190 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1191
1192 defp maybe_preload_bookmarks(query, opts) do
1193 query
1194 |> Activity.with_preloaded_bookmark(opts[:user])
1195 end
1196
1197 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1198 query
1199 |> Activity.with_preloaded_report_notes()
1200 end
1201
1202 defp maybe_preload_report_notes(query, _), do: query
1203
1204 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1205
1206 defp maybe_set_thread_muted_field(query, opts) do
1207 query
1208 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1209 end
1210
1211 defp maybe_order(query, %{order: :desc}) do
1212 query
1213 |> order_by(desc: :id)
1214 end
1215
1216 defp maybe_order(query, %{order: :asc}) do
1217 query
1218 |> order_by(asc: :id)
1219 end
1220
1221 defp maybe_order(query, _), do: query
1222
1223 defp fetch_activities_query_ap_ids_ops(opts) do
1224 source_user = opts[:muting_user]
1225 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1226
1227 ap_id_relationships =
1228 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1229 [:block | ap_id_relationships]
1230 else
1231 ap_id_relationships
1232 end
1233
1234 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1235
1236 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1237 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1238
1239 restrict_muted_reblogs_opts =
1240 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1241
1242 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1243 end
1244
1245 def fetch_activities_query(recipients, opts \\ %{}) do
1246 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1247 fetch_activities_query_ap_ids_ops(opts)
1248
1249 config = %{
1250 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1251 }
1252
1253 query =
1254 Activity
1255 |> maybe_preload_objects(opts)
1256 |> maybe_preload_bookmarks(opts)
1257 |> maybe_preload_report_notes(opts)
1258 |> maybe_set_thread_muted_field(opts)
1259 |> maybe_order(opts)
1260 |> restrict_recipients(recipients, opts[:user])
1261 |> restrict_replies(opts)
1262 |> restrict_since(opts)
1263 |> restrict_local(opts)
1264 |> restrict_remote(opts)
1265 |> restrict_actor(opts)
1266 |> restrict_type(opts)
1267 |> restrict_state(opts)
1268 |> restrict_favorited_by(opts)
1269 |> restrict_blocked(restrict_blocked_opts)
1270 |> restrict_muted(restrict_muted_opts)
1271 |> restrict_filtered(opts)
1272 |> restrict_media(opts)
1273 |> restrict_visibility(opts)
1274 |> restrict_thread_visibility(opts, config)
1275 |> restrict_reblogs(opts)
1276 |> restrict_pinned(opts)
1277 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1278 |> restrict_instance(opts)
1279 |> restrict_announce_object_actor(opts)
1280 |> restrict_filtered(opts)
1281 |> Activity.restrict_deactivated_users()
1282 |> exclude_poll_votes(opts)
1283 |> exclude_chat_messages(opts)
1284 |> exclude_invisible_actors(opts)
1285 |> exclude_visibility(opts)
1286
1287 if Config.get([:database, :improved_hashtag_timeline]) do
1288 query
1289 |> restrict_hashtag_any(opts)
1290 |> restrict_hashtag_all(opts)
1291 |> restrict_hashtag_reject_any(opts)
1292 else
1293 query
1294 |> restrict_embedded_tag_any(opts)
1295 |> restrict_embedded_tag_all(opts)
1296 |> restrict_embedded_tag_reject_any(opts)
1297 end
1298 end
1299
1300 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1301 list_memberships = Pleroma.List.memberships(opts[:user])
1302
1303 fetch_activities_query(recipients ++ list_memberships, opts)
1304 |> Pagination.fetch_paginated(opts, pagination)
1305 |> Enum.reverse()
1306 |> maybe_update_cc(list_memberships, opts[:user])
1307 end
1308
1309 @doc """
1310 Fetch favorites activities of user with order by sort adds to favorites
1311 """
1312 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1313 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1314 user.ap_id
1315 |> Activity.Queries.by_actor()
1316 |> Activity.Queries.by_type("Like")
1317 |> Activity.with_joined_object()
1318 |> Object.with_joined_activity()
1319 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1320 |> order_by([like, _, _], desc_nulls_last: like.id)
1321 |> Pagination.fetch_paginated(
1322 Map.merge(params, %{skip_order: true}),
1323 pagination
1324 )
1325 end
1326
1327 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1328 Enum.map(activities, fn
1329 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1330 if Enum.any?(bcc, &(&1 in list_memberships)) do
1331 update_in(activity.data["cc"], &[user_ap_id | &1])
1332 else
1333 activity
1334 end
1335
1336 activity ->
1337 activity
1338 end)
1339 end
1340
1341 defp maybe_update_cc(activities, _, _), do: activities
1342
1343 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1344 from(activity in query,
1345 where:
1346 fragment("? && ?", activity.recipients, ^recipients) or
1347 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1348 ^Constants.as_public() in activity.recipients)
1349 )
1350 end
1351
1352 def fetch_activities_bounded(
1353 recipients,
1354 recipients_with_public,
1355 opts \\ %{},
1356 pagination \\ :keyset
1357 ) do
1358 fetch_activities_query([], opts)
1359 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1360 |> Pagination.fetch_paginated(opts, pagination)
1361 |> Enum.reverse()
1362 end
1363
1364 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1365 def upload(file, opts \\ []) do
1366 with {:ok, data} <- Upload.store(file, opts) do
1367 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1368
1369 Repo.insert(%Object{data: obj_data})
1370 end
1371 end
1372
1373 @spec get_actor_url(any()) :: binary() | nil
1374 defp get_actor_url(url) when is_binary(url), do: url
1375 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1376
1377 defp get_actor_url(url) when is_list(url) do
1378 url
1379 |> List.first()
1380 |> get_actor_url()
1381 end
1382
1383 defp get_actor_url(_url), do: nil
1384
1385 defp object_to_user_data(data) do
1386 avatar =
1387 data["icon"]["url"] &&
1388 %{
1389 "type" => "Image",
1390 "url" => [%{"href" => data["icon"]["url"]}]
1391 }
1392
1393 banner =
1394 data["image"]["url"] &&
1395 %{
1396 "type" => "Image",
1397 "url" => [%{"href" => data["image"]["url"]}]
1398 }
1399
1400 fields =
1401 data
1402 |> Map.get("attachment", [])
1403 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1404 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1405
1406 emojis =
1407 data
1408 |> Map.get("tag", [])
1409 |> Enum.filter(fn
1410 %{"type" => "Emoji"} -> true
1411 _ -> false
1412 end)
1413 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1414 {String.trim(name, ":"), url}
1415 end)
1416
1417 is_locked = data["manuallyApprovesFollowers"] || false
1418 capabilities = data["capabilities"] || %{}
1419 accepts_chat_messages = capabilities["acceptsChatMessages"]
1420 data = Transmogrifier.maybe_fix_user_object(data)
1421 is_discoverable = data["discoverable"] || false
1422 invisible = data["invisible"] || false
1423 actor_type = data["type"] || "Person"
1424
1425 public_key =
1426 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1427 data["publicKey"]["publicKeyPem"]
1428 else
1429 nil
1430 end
1431
1432 shared_inbox =
1433 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1434 data["endpoints"]["sharedInbox"]
1435 else
1436 nil
1437 end
1438
1439 user_data = %{
1440 ap_id: data["id"],
1441 uri: get_actor_url(data["url"]),
1442 ap_enabled: true,
1443 banner: banner,
1444 fields: fields,
1445 emoji: emojis,
1446 is_locked: is_locked,
1447 is_discoverable: is_discoverable,
1448 invisible: invisible,
1449 avatar: avatar,
1450 name: data["name"],
1451 follower_address: data["followers"],
1452 following_address: data["following"],
1453 bio: data["summary"] || "",
1454 actor_type: actor_type,
1455 also_known_as: Map.get(data, "alsoKnownAs", []),
1456 public_key: public_key,
1457 inbox: data["inbox"],
1458 shared_inbox: shared_inbox,
1459 accepts_chat_messages: accepts_chat_messages
1460 }
1461
1462 # nickname can be nil because of virtual actors
1463 if data["preferredUsername"] do
1464 Map.put(
1465 user_data,
1466 :nickname,
1467 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1468 )
1469 else
1470 Map.put(user_data, :nickname, nil)
1471 end
1472 end
1473
1474 def fetch_follow_information_for_user(user) do
1475 with {:ok, following_data} <-
1476 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1477 {:ok, hide_follows} <- collection_private(following_data),
1478 {:ok, followers_data} <-
1479 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1480 {:ok, hide_followers} <- collection_private(followers_data) do
1481 {:ok,
1482 %{
1483 hide_follows: hide_follows,
1484 follower_count: normalize_counter(followers_data["totalItems"]),
1485 following_count: normalize_counter(following_data["totalItems"]),
1486 hide_followers: hide_followers
1487 }}
1488 else
1489 {:error, _} = e -> e
1490 e -> {:error, e}
1491 end
1492 end
1493
1494 defp normalize_counter(counter) when is_integer(counter), do: counter
1495 defp normalize_counter(_), do: 0
1496
1497 def maybe_update_follow_information(user_data) do
1498 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1499 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1500 {_, true} <-
1501 {:collections_available,
1502 !!(user_data[:following_address] && user_data[:follower_address])},
1503 {:ok, info} <-
1504 fetch_follow_information_for_user(user_data) do
1505 info = Map.merge(user_data[:info] || %{}, info)
1506
1507 user_data
1508 |> Map.put(:info, info)
1509 else
1510 {:user_type_check, false} ->
1511 user_data
1512
1513 {:collections_available, false} ->
1514 user_data
1515
1516 {:enabled, false} ->
1517 user_data
1518
1519 e ->
1520 Logger.error(
1521 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1522 )
1523
1524 user_data
1525 end
1526 end
1527
1528 defp collection_private(%{"first" => %{"type" => type}})
1529 when type in ["CollectionPage", "OrderedCollectionPage"],
1530 do: {:ok, false}
1531
1532 defp collection_private(%{"first" => first}) do
1533 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1534 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1535 {:ok, false}
1536 else
1537 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1538 {:error, _} = e -> e
1539 e -> {:error, e}
1540 end
1541 end
1542
1543 defp collection_private(_data), do: {:ok, true}
1544
1545 def user_data_from_user_object(data) do
1546 with {:ok, data} <- MRF.filter(data) do
1547 {:ok, object_to_user_data(data)}
1548 else
1549 e -> {:error, e}
1550 end
1551 end
1552
1553 def fetch_and_prepare_user_from_ap_id(ap_id) do
1554 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1555 {:ok, data} <- user_data_from_user_object(data) do
1556 {:ok, maybe_update_follow_information(data)}
1557 else
1558 # If this has been deleted, only log a debug and not an error
1559 {:error, "Object has been deleted" = e} ->
1560 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1561 {:error, e}
1562
1563 {:error, {:reject, reason} = e} ->
1564 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1565 {:error, e}
1566
1567 {:error, e} ->
1568 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1569 {:error, e}
1570 end
1571 end
1572
1573 def maybe_handle_clashing_nickname(data) do
1574 with nickname when is_binary(nickname) <- data[:nickname],
1575 %User{} = old_user <- User.get_by_nickname(nickname),
1576 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1577 Logger.info(
1578 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1579 data[:ap_id]
1580 }, renaming."
1581 )
1582
1583 old_user
1584 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1585 |> User.update_and_set_cache()
1586 else
1587 {:ap_id_comparison, true} ->
1588 Logger.info(
1589 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1590 )
1591
1592 _ ->
1593 nil
1594 end
1595 end
1596
1597 def make_user_from_ap_id(ap_id) do
1598 user = User.get_cached_by_ap_id(ap_id)
1599
1600 if user && !User.ap_enabled?(user) do
1601 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1602 else
1603 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1604 if user do
1605 user
1606 |> User.remote_user_changeset(data)
1607 |> User.update_and_set_cache()
1608 else
1609 maybe_handle_clashing_nickname(data)
1610
1611 data
1612 |> User.remote_user_changeset()
1613 |> Repo.insert()
1614 |> User.set_cache()
1615 end
1616 end
1617 end
1618 end
1619
1620 def make_user_from_nickname(nickname) do
1621 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1622 make_user_from_ap_id(ap_id)
1623 else
1624 _e -> {:error, "No AP id in WebFinger"}
1625 end
1626 end
1627
1628 # filter out broken threads
1629 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1630 entire_thread_visible_for_user?(activity, user)
1631 end
1632
1633 # do post-processing on a specific activity
1634 def contain_activity(%Activity{} = activity, %User{} = user) do
1635 contain_broken_threads(activity, user)
1636 end
1637
1638 def fetch_direct_messages_query do
1639 Activity
1640 |> restrict_type(%{type: "Create"})
1641 |> restrict_visibility(%{visibility: "direct"})
1642 |> order_by([activity], asc: activity.id)
1643 end
1644 end