Merge remote-tracking branch 'remotes/origin/develop' into feature/object-hashtags...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Hashtag
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
38
39 defp get_recipients(%{"type" => "Create"} = data) do
40 to = Map.get(data, "to", [])
41 cc = Map.get(data, "cc", [])
42 bcc = Map.get(data, "bcc", [])
43 actor = Map.get(data, "actor", [])
44 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(data) do
49 to = Map.get(data, "to", [])
50 cc = Map.get(data, "cc", [])
51 bcc = Map.get(data, "bcc", [])
52 recipients = Enum.concat([to, cc, bcc])
53 {recipients, to, cc}
54 end
55
56 defp check_actor_is_active(nil), do: true
57
58 defp check_actor_is_active(actor) when is_binary(actor) do
59 case User.get_cached_by_ap_id(actor) do
60 %User{is_active: true} -> true
61 _ -> false
62 end
63 end
64
65 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
66 limit = Config.get([:instance, :remote_limit])
67 String.length(content) <= limit
68 end
69
70 defp check_remote_limit(_), do: true
71
72 def increase_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
74 end
75
76 def decrease_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
78 end
79
80 defp increase_replies_count_if_reply(%{
81 "object" => %{"inReplyTo" => reply_ap_id} = object,
82 "type" => "Create"
83 }) do
84 if is_public?(object) do
85 Object.increase_replies_count(reply_ap_id)
86 end
87 end
88
89 defp increase_replies_count_if_reply(_create_data), do: :noop
90
91 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
92 @impl true
93 def persist(%{"type" => type} = object, meta) when type in @object_types do
94 with {:ok, object} <- Object.create(object) do
95 {:ok, object, meta}
96 end
97 end
98
99 @impl true
100 def persist(object, meta) do
101 with local <- Keyword.fetch!(meta, :local),
102 {recipients, _, _} <- get_recipients(object),
103 {:ok, activity} <-
104 Repo.insert(%Activity{
105 data: object,
106 local: local,
107 recipients: recipients,
108 actor: object["actor"]
109 }),
110 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
111 {:ok, _} <- maybe_create_activity_expiration(activity) do
112 {:ok, activity, meta}
113 end
114 end
115
116 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
117 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
118 with nil <- Activity.normalize(map),
119 map <- lazy_put_activity_defaults(map, fake),
120 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
121 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
122 {:ok, map} <- MRF.filter(map),
123 {recipients, _, _} = get_recipients(map),
124 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
125 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
126 {:ok, map, object} <- insert_full_object(map),
127 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
128 # Splice in the child object if we have one.
129 activity = Maps.put_if_present(activity, :object, object)
130
131 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
132 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
133 end)
134
135 {:ok, activity}
136 else
137 %Activity{} = activity ->
138 {:ok, activity}
139
140 {:actor_check, _} ->
141 {:error, false}
142
143 {:containment, _} = error ->
144 error
145
146 {:error, _} = error ->
147 error
148
149 {:fake, true, map, recipients} ->
150 activity = %Activity{
151 data: map,
152 local: local,
153 actor: map["actor"],
154 recipients: recipients,
155 id: "pleroma:fakeid"
156 }
157
158 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
159 {:ok, activity}
160
161 {:remote_limit_pass, _} ->
162 {:error, :remote_limit}
163
164 {:reject, _} = e ->
165 {:error, e}
166 end
167 end
168
169 defp insert_activity_with_expiration(data, local, recipients) do
170 struct = %Activity{
171 data: data,
172 local: local,
173 actor: data["actor"],
174 recipients: recipients
175 }
176
177 with {:ok, activity} <- Repo.insert(struct) do
178 maybe_create_activity_expiration(activity)
179 end
180 end
181
182 def notify_and_stream(activity) do
183 Notification.create_notifications(activity)
184
185 conversation = create_or_bump_conversation(activity, activity.actor)
186 participations = get_participations(conversation)
187 stream_out(activity)
188 stream_out_participations(participations)
189 end
190
191 defp maybe_create_activity_expiration(
192 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
193 ) do
194 with {:ok, _job} <-
195 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
196 activity_id: activity.id,
197 expires_at: expires_at
198 }) do
199 {:ok, activity}
200 end
201 end
202
203 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
204
205 defp create_or_bump_conversation(activity, actor) do
206 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
207 %User{} = user <- User.get_cached_by_ap_id(actor) do
208 Participation.mark_as_read(user, conversation)
209 {:ok, conversation}
210 end
211 end
212
213 defp get_participations({:ok, conversation}) do
214 conversation
215 |> Repo.preload(:participations, force: true)
216 |> Map.get(:participations)
217 end
218
219 defp get_participations(_), do: []
220
221 def stream_out_participations(participations) do
222 participations =
223 participations
224 |> Repo.preload(:user)
225
226 Streamer.stream("participation", participations)
227 end
228
229 @impl true
230 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
231 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
232 conversation = Repo.preload(conversation, :participations)
233
234 last_activity_id =
235 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
236 user: user,
237 blocking_user: user
238 })
239
240 if last_activity_id do
241 stream_out_participations(conversation.participations)
242 end
243 end
244 end
245
246 @impl true
247 def stream_out_participations(_, _), do: :noop
248
249 @impl true
250 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
251 when data_type in ["Create", "Announce", "Delete"] do
252 activity
253 |> Topics.get_activity_topics()
254 |> Streamer.stream(activity)
255 end
256
257 @impl true
258 def stream_out(_activity) do
259 :noop
260 end
261
262 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
263 def create(params, fake \\ false) do
264 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
265 result
266 end
267 end
268
269 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274 quick_insert? = Config.get([:env]) == :benchmark
275
276 create_data =
277 make_create_data(
278 %{to: to, actor: actor, published: published, context: context, object: object},
279 additional
280 )
281
282 with {:ok, activity} <- insert(create_data, local, fake),
283 {:fake, false, activity} <- {:fake, fake, activity},
284 _ <- increase_replies_count_if_reply(create_data),
285 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
286 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
287 _ <- notify_and_stream(activity),
288 :ok <- maybe_federate(activity) do
289 {:ok, activity}
290 else
291 {:quick_insert, true, activity} ->
292 {:ok, activity}
293
294 {:fake, true, activity} ->
295 {:ok, activity}
296
297 {:error, message} ->
298 Repo.rollback(message)
299 end
300 end
301
302 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
303 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
304 additional = params[:additional] || %{}
305 # only accept false as false value
306 local = !(params[:local] == false)
307 published = params[:published]
308
309 listen_data =
310 make_listen_data(
311 %{to: to, actor: actor, published: published, context: context, object: object},
312 additional
313 )
314
315 with {:ok, activity} <- insert(listen_data, local),
316 _ <- notify_and_stream(activity),
317 :ok <- maybe_federate(activity) do
318 {:ok, activity}
319 end
320 end
321
322 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
323 {:ok, Activity.t()} | nil | {:error, any()}
324 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
325 with {:ok, result} <-
326 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
327 result
328 end
329 end
330
331 defp do_unfollow(follower, followed, activity_id, local) do
332 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
333 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
334 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
335 {:ok, activity} <- insert(unfollow_data, local),
336 _ <- notify_and_stream(activity),
337 :ok <- maybe_federate(activity) do
338 {:ok, activity}
339 else
340 nil -> nil
341 {:error, error} -> Repo.rollback(error)
342 end
343 end
344
345 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
346 def flag(params) do
347 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
348 result
349 end
350 end
351
352 defp do_flag(
353 %{
354 actor: actor,
355 context: _context,
356 account: account,
357 statuses: statuses,
358 content: content
359 } = params
360 ) do
361 # only accept false as false value
362 local = !(params[:local] == false)
363 forward = !(params[:forward] == false)
364
365 additional = params[:additional] || %{}
366
367 additional =
368 if forward do
369 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
370 else
371 Map.merge(additional, %{"to" => [], "cc" => []})
372 end
373
374 with flag_data <- make_flag_data(params, additional),
375 {:ok, activity} <- insert(flag_data, local),
376 {:ok, stripped_activity} <- strip_report_status_data(activity),
377 _ <- notify_and_stream(activity),
378 :ok <-
379 maybe_federate(stripped_activity) do
380 User.all_superusers()
381 |> Enum.filter(fn user -> user.ap_id != actor end)
382 |> Enum.filter(fn user -> not is_nil(user.email) end)
383 |> Enum.each(fn superuser ->
384 superuser
385 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
386 |> Pleroma.Emails.Mailer.deliver_async()
387 end)
388
389 {:ok, activity}
390 else
391 {:error, error} -> Repo.rollback(error)
392 end
393 end
394
395 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
396 def move(%User{} = origin, %User{} = target, local \\ true) do
397 params = %{
398 "type" => "Move",
399 "actor" => origin.ap_id,
400 "object" => origin.ap_id,
401 "target" => target.ap_id
402 }
403
404 with true <- origin.ap_id in target.also_known_as,
405 {:ok, activity} <- insert(params, local),
406 _ <- notify_and_stream(activity) do
407 maybe_federate(activity)
408
409 BackgroundWorker.enqueue("move_following", %{
410 "origin_id" => origin.id,
411 "target_id" => target.id
412 })
413
414 {:ok, activity}
415 else
416 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
417 err -> err
418 end
419 end
420
421 def fetch_activities_for_context_query(context, opts) do
422 public = [Constants.as_public()]
423
424 recipients =
425 if opts[:user],
426 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
427 else: public
428
429 from(activity in Activity)
430 |> maybe_preload_objects(opts)
431 |> maybe_preload_bookmarks(opts)
432 |> maybe_set_thread_muted_field(opts)
433 |> restrict_blocked(opts)
434 |> restrict_recipients(recipients, opts[:user])
435 |> restrict_filtered(opts)
436 |> where(
437 [activity],
438 fragment(
439 "?->>'type' = ? and ?->>'context' = ?",
440 activity.data,
441 "Create",
442 activity.data,
443 ^context
444 )
445 )
446 |> exclude_poll_votes(opts)
447 |> exclude_id(opts)
448 |> order_by([activity], desc: activity.id)
449 end
450
451 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
452 def fetch_activities_for_context(context, opts \\ %{}) do
453 context
454 |> fetch_activities_for_context_query(opts)
455 |> Repo.all()
456 end
457
458 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
459 FlakeId.Ecto.CompatType.t() | nil
460 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
461 context
462 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
463 |> restrict_visibility(%{visibility: "direct"})
464 |> limit(1)
465 |> select([a], a.id)
466 |> Repo.one()
467 end
468
469 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
470 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
471 opts = Map.delete(opts, :user)
472
473 [Constants.as_public()]
474 |> fetch_activities_query(opts)
475 |> restrict_unlisted(opts)
476 |> Pagination.fetch_paginated(opts, pagination)
477 end
478
479 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
480 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
481 opts
482 |> Map.put(:restrict_unlisted, true)
483 |> fetch_public_or_unlisted_activities(pagination)
484 end
485
486 @valid_visibilities ~w[direct unlisted public private]
487
488 defp restrict_visibility(query, %{visibility: visibility})
489 when is_list(visibility) do
490 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
491 from(
492 a in query,
493 where:
494 fragment(
495 "activity_visibility(?, ?, ?) = ANY (?)",
496 a.actor,
497 a.recipients,
498 a.data,
499 ^visibility
500 )
501 )
502 else
503 Logger.error("Could not restrict visibility to #{visibility}")
504 end
505 end
506
507 defp restrict_visibility(query, %{visibility: visibility})
508 when visibility in @valid_visibilities do
509 from(
510 a in query,
511 where:
512 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
513 )
514 end
515
516 defp restrict_visibility(_query, %{visibility: visibility})
517 when visibility not in @valid_visibilities do
518 Logger.error("Could not restrict visibility to #{visibility}")
519 end
520
521 defp restrict_visibility(query, _visibility), do: query
522
523 defp exclude_visibility(query, %{exclude_visibilities: visibility})
524 when is_list(visibility) do
525 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
526 from(
527 a in query,
528 where:
529 not fragment(
530 "activity_visibility(?, ?, ?) = ANY (?)",
531 a.actor,
532 a.recipients,
533 a.data,
534 ^visibility
535 )
536 )
537 else
538 Logger.error("Could not exclude visibility to #{visibility}")
539 query
540 end
541 end
542
543 defp exclude_visibility(query, %{exclude_visibilities: visibility})
544 when visibility in @valid_visibilities do
545 from(
546 a in query,
547 where:
548 not fragment(
549 "activity_visibility(?, ?, ?) = ?",
550 a.actor,
551 a.recipients,
552 a.data,
553 ^visibility
554 )
555 )
556 end
557
558 defp exclude_visibility(query, %{exclude_visibilities: visibility})
559 when visibility not in [nil | @valid_visibilities] do
560 Logger.error("Could not exclude visibility to #{visibility}")
561 query
562 end
563
564 defp exclude_visibility(query, _visibility), do: query
565
566 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
567 do: query
568
569 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
570 do: query
571
572 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
573 from(
574 a in query,
575 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
576 )
577 end
578
579 defp restrict_thread_visibility(query, _, _), do: query
580
581 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
582 params =
583 params
584 |> Map.put(:user, reading_user)
585 |> Map.put(:actor_id, user.ap_id)
586
587 %{
588 godmode: params[:godmode],
589 reading_user: reading_user
590 }
591 |> user_activities_recipients()
592 |> fetch_activities(params)
593 |> Enum.reverse()
594 end
595
596 def fetch_user_activities(user, reading_user, params \\ %{})
597
598 def fetch_user_activities(user, reading_user, %{total: true} = params) do
599 result = fetch_activities_for_user(user, reading_user, params)
600
601 Keyword.put(result, :items, Enum.reverse(result[:items]))
602 end
603
604 def fetch_user_activities(user, reading_user, params) do
605 user
606 |> fetch_activities_for_user(reading_user, params)
607 |> Enum.reverse()
608 end
609
610 defp fetch_activities_for_user(user, reading_user, params) do
611 params =
612 params
613 |> Map.put(:type, ["Create", "Announce"])
614 |> Map.put(:user, reading_user)
615 |> Map.put(:actor_id, user.ap_id)
616 |> Map.put(:pinned_activity_ids, user.pinned_activities)
617
618 params =
619 if User.blocks?(reading_user, user) do
620 params
621 else
622 params
623 |> Map.put(:blocking_user, reading_user)
624 |> Map.put(:muting_user, reading_user)
625 end
626
627 pagination_type = Map.get(params, :pagination_type) || :keyset
628
629 %{
630 godmode: params[:godmode],
631 reading_user: reading_user
632 }
633 |> user_activities_recipients()
634 |> fetch_activities(params, pagination_type)
635 end
636
637 def fetch_statuses(reading_user, %{total: true} = params) do
638 result = fetch_activities_for_reading_user(reading_user, params)
639 Keyword.put(result, :items, Enum.reverse(result[:items]))
640 end
641
642 def fetch_statuses(reading_user, params) do
643 reading_user
644 |> fetch_activities_for_reading_user(params)
645 |> Enum.reverse()
646 end
647
648 defp fetch_activities_for_reading_user(reading_user, params) do
649 params = Map.put(params, :type, ["Create", "Announce"])
650
651 %{
652 godmode: params[:godmode],
653 reading_user: reading_user
654 }
655 |> user_activities_recipients()
656 |> fetch_activities(params, :offset)
657 end
658
659 defp user_activities_recipients(%{godmode: true}), do: []
660
661 defp user_activities_recipients(%{reading_user: reading_user}) do
662 if reading_user do
663 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
664 else
665 [Constants.as_public()]
666 end
667 end
668
669 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
670 raise "Can't use the child object without preloading!"
671 end
672
673 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
674 from(
675 [activity, object] in query,
676 where:
677 fragment(
678 "?->>'type' != ? or ?->>'actor' != ?",
679 activity.data,
680 "Announce",
681 object.data,
682 ^actor
683 )
684 )
685 end
686
687 defp restrict_announce_object_actor(query, _), do: query
688
689 defp restrict_since(query, %{since_id: ""}), do: query
690
691 defp restrict_since(query, %{since_id: since_id}) do
692 from(activity in query, where: activity.id > ^since_id)
693 end
694
695 defp restrict_since(query, _), do: query
696
697 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
698 raise_on_missing_preload()
699 end
700
701 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
702 from(
703 [_activity, object] in query,
704 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
705 )
706 end
707
708 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
709 restrict_embedded_tag_any(query, %{tag: tag})
710 end
711
712 defp restrict_embedded_tag_all(query, _), do: query
713
714 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
715 raise_on_missing_preload()
716 end
717
718 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
719 from(
720 [_activity, object] in query,
721 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
722 )
723 end
724
725 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
726 restrict_embedded_tag_any(query, %{tag: [tag]})
727 end
728
729 defp restrict_embedded_tag_any(query, _), do: query
730
731 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
732 raise_on_missing_preload()
733 end
734
735 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
736 from(
737 [_activity, object] in query,
738 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
739 )
740 end
741
742 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
743 when is_binary(tag_reject) do
744 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
745 end
746
747 defp restrict_embedded_tag_reject_any(query, _), do: query
748
749 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
750 raise_on_missing_preload()
751 end
752
753 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
754 restrict_hashtag_any(query, %{tag: single_tag})
755 end
756
757 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
758 from(
759 [_activity, object] in query,
760 where:
761 fragment(
762 """
763 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
764 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
765 AND hashtags_objects.object_id = ?) @> ?
766 """,
767 ^tags,
768 object.id,
769 ^tags
770 )
771 )
772 end
773
774 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
775 restrict_hashtag_all(query, %{tag_all: [tag]})
776 end
777
778 defp restrict_hashtag_all(query, _), do: query
779
780 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
781 raise_on_missing_preload()
782 end
783
784 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
785 from(
786 [_activity, object] in query,
787 where:
788 fragment(
789 """
790 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
791 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
792 AND hashtags_objects.object_id = ? LIMIT 1)
793 """,
794 ^tags,
795 object.id
796 )
797 )
798 end
799
800 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
801 restrict_hashtag_any(query, %{tag: [tag]})
802 end
803
804 defp restrict_hashtag_any(query, _), do: query
805
806 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
807 raise_on_missing_preload()
808 end
809
810 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
811 from(
812 [_activity, object] in query,
813 where:
814 fragment(
815 """
816 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
817 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
818 AND hashtags_objects.object_id = ? LIMIT 1)
819 """,
820 ^tags_reject,
821 object.id
822 )
823 )
824 end
825
826 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
827 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
828 end
829
830 defp restrict_hashtag_reject_any(query, _), do: query
831
832 defp raise_on_missing_preload do
833 raise "Can't use the child object without preloading!"
834 end
835
836 defp restrict_recipients(query, [], _user), do: query
837
838 defp restrict_recipients(query, recipients, nil) do
839 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
840 end
841
842 defp restrict_recipients(query, recipients, user) do
843 from(
844 activity in query,
845 where: fragment("? && ?", ^recipients, activity.recipients),
846 or_where: activity.actor == ^user.ap_id
847 )
848 end
849
850 defp restrict_local(query, %{local_only: true}) do
851 from(activity in query, where: activity.local == true)
852 end
853
854 defp restrict_local(query, _), do: query
855
856 defp restrict_remote(query, %{remote: true}) do
857 from(activity in query, where: activity.local == false)
858 end
859
860 defp restrict_remote(query, _), do: query
861
862 defp restrict_actor(query, %{actor_id: actor_id}) do
863 from(activity in query, where: activity.actor == ^actor_id)
864 end
865
866 defp restrict_actor(query, _), do: query
867
868 defp restrict_type(query, %{type: type}) when is_binary(type) do
869 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
870 end
871
872 defp restrict_type(query, %{type: type}) do
873 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
874 end
875
876 defp restrict_type(query, _), do: query
877
878 defp restrict_state(query, %{state: state}) do
879 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
880 end
881
882 defp restrict_state(query, _), do: query
883
884 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
885 from(
886 [_activity, object] in query,
887 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
888 )
889 end
890
891 defp restrict_favorited_by(query, _), do: query
892
893 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
894 raise "Can't use the child object without preloading!"
895 end
896
897 defp restrict_media(query, %{only_media: true}) do
898 from(
899 [activity, object] in query,
900 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
901 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
902 )
903 end
904
905 defp restrict_media(query, _), do: query
906
907 defp restrict_replies(query, %{exclude_replies: true}) do
908 from(
909 [_activity, object] in query,
910 where: fragment("?->>'inReplyTo' is null", object.data)
911 )
912 end
913
914 defp restrict_replies(query, %{
915 reply_filtering_user: %User{} = user,
916 reply_visibility: "self"
917 }) do
918 from(
919 [activity, object] in query,
920 where:
921 fragment(
922 "?->>'inReplyTo' is null OR ? = ANY(?)",
923 object.data,
924 ^user.ap_id,
925 activity.recipients
926 )
927 )
928 end
929
930 defp restrict_replies(query, %{
931 reply_filtering_user: %User{} = user,
932 reply_visibility: "following"
933 }) do
934 from(
935 [activity, object] in query,
936 where:
937 fragment(
938 """
939 ?->>'type' != 'Create' -- This isn't a Create
940 OR ?->>'inReplyTo' is null -- this isn't a reply
941 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
942 -- unless they are the author (because authors
943 -- are also part of the recipients). This leads
944 -- to a bug that self-replies by friends won't
945 -- show up.
946 OR ? = ? -- The actor is us
947 """,
948 activity.data,
949 object.data,
950 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
951 activity.recipients,
952 activity.actor,
953 activity.actor,
954 ^user.ap_id
955 )
956 )
957 end
958
959 defp restrict_replies(query, _), do: query
960
961 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
962 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
963 end
964
965 defp restrict_reblogs(query, _), do: query
966
967 defp restrict_muted(query, %{with_muted: true}), do: query
968
969 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
970 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
971
972 query =
973 from([activity] in query,
974 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
975 where:
976 fragment(
977 "not (?->'to' \\?| ?) or ? = ?",
978 activity.data,
979 ^mutes,
980 activity.actor,
981 ^user.ap_id
982 )
983 )
984
985 unless opts[:skip_preload] do
986 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
987 else
988 query
989 end
990 end
991
992 defp restrict_muted(query, _), do: query
993
994 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
995 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
996 domain_blocks = user.domain_blocks || []
997
998 following_ap_ids = User.get_friends_ap_ids(user)
999
1000 query =
1001 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1002
1003 from(
1004 [activity, object: o] in query,
1005 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1006 where:
1007 fragment(
1008 "((not (? && ?)) or ? = ?)",
1009 activity.recipients,
1010 ^blocked_ap_ids,
1011 activity.actor,
1012 ^user.ap_id
1013 ),
1014 where:
1015 fragment(
1016 "recipients_contain_blocked_domains(?, ?) = false",
1017 activity.recipients,
1018 ^domain_blocks
1019 ),
1020 where:
1021 fragment(
1022 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1023 activity.data,
1024 activity.data,
1025 ^blocked_ap_ids
1026 ),
1027 where:
1028 fragment(
1029 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1030 activity.actor,
1031 ^domain_blocks,
1032 activity.actor,
1033 ^following_ap_ids
1034 ),
1035 where:
1036 fragment(
1037 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1038 o.data,
1039 ^domain_blocks,
1040 o.data,
1041 ^following_ap_ids
1042 )
1043 )
1044 end
1045
1046 defp restrict_blocked(query, _), do: query
1047
1048 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1049 from(
1050 activity in query,
1051 where:
1052 fragment(
1053 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1054 activity.data,
1055 ^[Constants.as_public()]
1056 )
1057 )
1058 end
1059
1060 defp restrict_unlisted(query, _), do: query
1061
1062 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1063 from(activity in query, where: activity.id in ^ids)
1064 end
1065
1066 defp restrict_pinned(query, _), do: query
1067
1068 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1069 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1070
1071 from(
1072 activity in query,
1073 where:
1074 fragment(
1075 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1076 activity.data,
1077 activity.actor,
1078 ^muted_reblogs
1079 )
1080 )
1081 end
1082
1083 defp restrict_muted_reblogs(query, _), do: query
1084
1085 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1086 from(
1087 activity in query,
1088 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1089 )
1090 end
1091
1092 defp restrict_instance(query, _), do: query
1093
1094 defp restrict_filtered(query, %{user: %User{} = user}) do
1095 case Filter.compose_regex(user) do
1096 nil ->
1097 query
1098
1099 regex ->
1100 from([activity, object] in query,
1101 where:
1102 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1103 activity.actor == ^user.ap_id
1104 )
1105 end
1106 end
1107
1108 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1109 restrict_filtered(query, %{user: user})
1110 end
1111
1112 defp restrict_filtered(query, _), do: query
1113
1114 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1115
1116 defp exclude_poll_votes(query, _) do
1117 if has_named_binding?(query, :object) do
1118 from([activity, object: o] in query,
1119 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1120 )
1121 else
1122 query
1123 end
1124 end
1125
1126 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1127
1128 defp exclude_chat_messages(query, _) do
1129 if has_named_binding?(query, :object) do
1130 from([activity, object: o] in query,
1131 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1132 )
1133 else
1134 query
1135 end
1136 end
1137
1138 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1139
1140 defp exclude_invisible_actors(query, _opts) do
1141 invisible_ap_ids =
1142 User.Query.build(%{invisible: true, select: [:ap_id]})
1143 |> Repo.all()
1144 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1145
1146 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1147 end
1148
1149 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1150 from(activity in query, where: activity.id != ^id)
1151 end
1152
1153 defp exclude_id(query, _), do: query
1154
1155 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1156
1157 defp maybe_preload_objects(query, _) do
1158 query
1159 |> Activity.with_preloaded_object()
1160 end
1161
1162 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1163
1164 defp maybe_preload_bookmarks(query, opts) do
1165 query
1166 |> Activity.with_preloaded_bookmark(opts[:user])
1167 end
1168
1169 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1170 query
1171 |> Activity.with_preloaded_report_notes()
1172 end
1173
1174 defp maybe_preload_report_notes(query, _), do: query
1175
1176 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1177
1178 defp maybe_set_thread_muted_field(query, opts) do
1179 query
1180 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1181 end
1182
1183 defp maybe_order(query, %{order: :desc}) do
1184 query
1185 |> order_by(desc: :id)
1186 end
1187
1188 defp maybe_order(query, %{order: :asc}) do
1189 query
1190 |> order_by(asc: :id)
1191 end
1192
1193 defp maybe_order(query, _), do: query
1194
1195 defp normalize_fetch_activities_query_opts(opts) do
1196 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1197 case opts[key] do
1198 value when is_bitstring(value) ->
1199 Map.put(opts, key, Hashtag.normalize_name(value))
1200
1201 value when is_list(value) ->
1202 Map.put(opts, key, Enum.map(value, &Hashtag.normalize_name/1))
1203
1204 _ ->
1205 opts
1206 end
1207 end)
1208 end
1209
1210 defp fetch_activities_query_ap_ids_ops(opts) do
1211 source_user = opts[:muting_user]
1212 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1213
1214 ap_id_relationships =
1215 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1216 [:block | ap_id_relationships]
1217 else
1218 ap_id_relationships
1219 end
1220
1221 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1222
1223 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1224 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1225
1226 restrict_muted_reblogs_opts =
1227 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1228
1229 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1230 end
1231
1232 def fetch_activities_query(recipients, opts \\ %{}) do
1233 opts = normalize_fetch_activities_query_opts(opts)
1234
1235 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1236 fetch_activities_query_ap_ids_ops(opts)
1237
1238 config = %{
1239 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1240 }
1241
1242 query =
1243 Activity
1244 |> maybe_preload_objects(opts)
1245 |> maybe_preload_bookmarks(opts)
1246 |> maybe_preload_report_notes(opts)
1247 |> maybe_set_thread_muted_field(opts)
1248 |> maybe_order(opts)
1249 |> restrict_recipients(recipients, opts[:user])
1250 |> restrict_replies(opts)
1251 |> restrict_since(opts)
1252 |> restrict_local(opts)
1253 |> restrict_remote(opts)
1254 |> restrict_actor(opts)
1255 |> restrict_type(opts)
1256 |> restrict_state(opts)
1257 |> restrict_favorited_by(opts)
1258 |> restrict_blocked(restrict_blocked_opts)
1259 |> restrict_muted(restrict_muted_opts)
1260 |> restrict_filtered(opts)
1261 |> restrict_media(opts)
1262 |> restrict_visibility(opts)
1263 |> restrict_thread_visibility(opts, config)
1264 |> restrict_reblogs(opts)
1265 |> restrict_pinned(opts)
1266 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1267 |> restrict_instance(opts)
1268 |> restrict_announce_object_actor(opts)
1269 |> restrict_filtered(opts)
1270 |> Activity.restrict_deactivated_users()
1271 |> exclude_poll_votes(opts)
1272 |> exclude_chat_messages(opts)
1273 |> exclude_invisible_actors(opts)
1274 |> exclude_visibility(opts)
1275
1276 if Config.feature_enabled?(:improved_hashtag_timeline) do
1277 query
1278 |> restrict_hashtag_any(opts)
1279 |> restrict_hashtag_all(opts)
1280 |> restrict_hashtag_reject_any(opts)
1281 else
1282 query
1283 |> restrict_embedded_tag_any(opts)
1284 |> restrict_embedded_tag_all(opts)
1285 |> restrict_embedded_tag_reject_any(opts)
1286 end
1287 end
1288
1289 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1290 list_memberships = Pleroma.List.memberships(opts[:user])
1291
1292 fetch_activities_query(recipients ++ list_memberships, opts)
1293 |> Pagination.fetch_paginated(opts, pagination)
1294 |> Enum.reverse()
1295 |> maybe_update_cc(list_memberships, opts[:user])
1296 end
1297
1298 @doc """
1299 Fetch favorites activities of user with order by sort adds to favorites
1300 """
1301 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1302 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1303 user.ap_id
1304 |> Activity.Queries.by_actor()
1305 |> Activity.Queries.by_type("Like")
1306 |> Activity.with_joined_object()
1307 |> Object.with_joined_activity()
1308 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1309 |> order_by([like, _, _], desc_nulls_last: like.id)
1310 |> Pagination.fetch_paginated(
1311 Map.merge(params, %{skip_order: true}),
1312 pagination
1313 )
1314 end
1315
1316 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1317 Enum.map(activities, fn
1318 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1319 if Enum.any?(bcc, &(&1 in list_memberships)) do
1320 update_in(activity.data["cc"], &[user_ap_id | &1])
1321 else
1322 activity
1323 end
1324
1325 activity ->
1326 activity
1327 end)
1328 end
1329
1330 defp maybe_update_cc(activities, _, _), do: activities
1331
1332 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1333 from(activity in query,
1334 where:
1335 fragment("? && ?", activity.recipients, ^recipients) or
1336 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1337 ^Constants.as_public() in activity.recipients)
1338 )
1339 end
1340
1341 def fetch_activities_bounded(
1342 recipients,
1343 recipients_with_public,
1344 opts \\ %{},
1345 pagination \\ :keyset
1346 ) do
1347 fetch_activities_query([], opts)
1348 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1349 |> Pagination.fetch_paginated(opts, pagination)
1350 |> Enum.reverse()
1351 end
1352
1353 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1354 def upload(file, opts \\ []) do
1355 with {:ok, data} <- Upload.store(file, opts) do
1356 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1357
1358 Repo.insert(%Object{data: obj_data})
1359 end
1360 end
1361
1362 @spec get_actor_url(any()) :: binary() | nil
1363 defp get_actor_url(url) when is_binary(url), do: url
1364 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1365
1366 defp get_actor_url(url) when is_list(url) do
1367 url
1368 |> List.first()
1369 |> get_actor_url()
1370 end
1371
1372 defp get_actor_url(_url), do: nil
1373
1374 defp object_to_user_data(data) do
1375 avatar =
1376 data["icon"]["url"] &&
1377 %{
1378 "type" => "Image",
1379 "url" => [%{"href" => data["icon"]["url"]}]
1380 }
1381
1382 banner =
1383 data["image"]["url"] &&
1384 %{
1385 "type" => "Image",
1386 "url" => [%{"href" => data["image"]["url"]}]
1387 }
1388
1389 fields =
1390 data
1391 |> Map.get("attachment", [])
1392 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1393 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1394
1395 emojis =
1396 data
1397 |> Map.get("tag", [])
1398 |> Enum.filter(fn
1399 %{"type" => "Emoji"} -> true
1400 _ -> false
1401 end)
1402 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1403 {String.trim(name, ":"), url}
1404 end)
1405
1406 is_locked = data["manuallyApprovesFollowers"] || false
1407 capabilities = data["capabilities"] || %{}
1408 accepts_chat_messages = capabilities["acceptsChatMessages"]
1409 data = Transmogrifier.maybe_fix_user_object(data)
1410 is_discoverable = data["discoverable"] || false
1411 invisible = data["invisible"] || false
1412 actor_type = data["type"] || "Person"
1413
1414 public_key =
1415 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1416 data["publicKey"]["publicKeyPem"]
1417 else
1418 nil
1419 end
1420
1421 shared_inbox =
1422 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1423 data["endpoints"]["sharedInbox"]
1424 else
1425 nil
1426 end
1427
1428 user_data = %{
1429 ap_id: data["id"],
1430 uri: get_actor_url(data["url"]),
1431 ap_enabled: true,
1432 banner: banner,
1433 fields: fields,
1434 emoji: emojis,
1435 is_locked: is_locked,
1436 is_discoverable: is_discoverable,
1437 invisible: invisible,
1438 avatar: avatar,
1439 name: data["name"],
1440 follower_address: data["followers"],
1441 following_address: data["following"],
1442 bio: data["summary"] || "",
1443 actor_type: actor_type,
1444 also_known_as: Map.get(data, "alsoKnownAs", []),
1445 public_key: public_key,
1446 inbox: data["inbox"],
1447 shared_inbox: shared_inbox,
1448 accepts_chat_messages: accepts_chat_messages
1449 }
1450
1451 # nickname can be nil because of virtual actors
1452 if data["preferredUsername"] do
1453 Map.put(
1454 user_data,
1455 :nickname,
1456 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1457 )
1458 else
1459 Map.put(user_data, :nickname, nil)
1460 end
1461 end
1462
1463 def fetch_follow_information_for_user(user) do
1464 with {:ok, following_data} <-
1465 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1466 {:ok, hide_follows} <- collection_private(following_data),
1467 {:ok, followers_data} <-
1468 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1469 {:ok, hide_followers} <- collection_private(followers_data) do
1470 {:ok,
1471 %{
1472 hide_follows: hide_follows,
1473 follower_count: normalize_counter(followers_data["totalItems"]),
1474 following_count: normalize_counter(following_data["totalItems"]),
1475 hide_followers: hide_followers
1476 }}
1477 else
1478 {:error, _} = e -> e
1479 e -> {:error, e}
1480 end
1481 end
1482
1483 defp normalize_counter(counter) when is_integer(counter), do: counter
1484 defp normalize_counter(_), do: 0
1485
1486 def maybe_update_follow_information(user_data) do
1487 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1488 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1489 {_, true} <-
1490 {:collections_available,
1491 !!(user_data[:following_address] && user_data[:follower_address])},
1492 {:ok, info} <-
1493 fetch_follow_information_for_user(user_data) do
1494 info = Map.merge(user_data[:info] || %{}, info)
1495
1496 user_data
1497 |> Map.put(:info, info)
1498 else
1499 {:user_type_check, false} ->
1500 user_data
1501
1502 {:collections_available, false} ->
1503 user_data
1504
1505 {:enabled, false} ->
1506 user_data
1507
1508 e ->
1509 Logger.error(
1510 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1511 )
1512
1513 user_data
1514 end
1515 end
1516
1517 defp collection_private(%{"first" => %{"type" => type}})
1518 when type in ["CollectionPage", "OrderedCollectionPage"],
1519 do: {:ok, false}
1520
1521 defp collection_private(%{"first" => first}) do
1522 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1523 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1524 {:ok, false}
1525 else
1526 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1527 {:error, _} = e -> e
1528 e -> {:error, e}
1529 end
1530 end
1531
1532 defp collection_private(_data), do: {:ok, true}
1533
1534 def user_data_from_user_object(data) do
1535 with {:ok, data} <- MRF.filter(data) do
1536 {:ok, object_to_user_data(data)}
1537 else
1538 e -> {:error, e}
1539 end
1540 end
1541
1542 def fetch_and_prepare_user_from_ap_id(ap_id) do
1543 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1544 {:ok, data} <- user_data_from_user_object(data) do
1545 {:ok, maybe_update_follow_information(data)}
1546 else
1547 # If this has been deleted, only log a debug and not an error
1548 {:error, "Object has been deleted" = e} ->
1549 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1550 {:error, e}
1551
1552 {:error, {:reject, reason} = e} ->
1553 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1554 {:error, e}
1555
1556 {:error, e} ->
1557 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1558 {:error, e}
1559 end
1560 end
1561
1562 def maybe_handle_clashing_nickname(data) do
1563 with nickname when is_binary(nickname) <- data[:nickname],
1564 %User{} = old_user <- User.get_by_nickname(nickname),
1565 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1566 Logger.info(
1567 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1568 data[:ap_id]
1569 }, renaming."
1570 )
1571
1572 old_user
1573 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1574 |> User.update_and_set_cache()
1575 else
1576 {:ap_id_comparison, true} ->
1577 Logger.info(
1578 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1579 )
1580
1581 _ ->
1582 nil
1583 end
1584 end
1585
1586 def make_user_from_ap_id(ap_id) do
1587 user = User.get_cached_by_ap_id(ap_id)
1588
1589 if user && !User.ap_enabled?(user) do
1590 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1591 else
1592 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1593 if user do
1594 user
1595 |> User.remote_user_changeset(data)
1596 |> User.update_and_set_cache()
1597 else
1598 maybe_handle_clashing_nickname(data)
1599
1600 data
1601 |> User.remote_user_changeset()
1602 |> Repo.insert()
1603 |> User.set_cache()
1604 end
1605 end
1606 end
1607 end
1608
1609 def make_user_from_nickname(nickname) do
1610 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1611 make_user_from_ap_id(ap_id)
1612 else
1613 _e -> {:error, "No AP id in WebFinger"}
1614 end
1615 end
1616
1617 # filter out broken threads
1618 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1619 entire_thread_visible_for_user?(activity, user)
1620 end
1621
1622 # do post-processing on a specific activity
1623 def contain_activity(%Activity{} = activity, %User{} = user) do
1624 contain_broken_threads(activity, user)
1625 end
1626
1627 def fetch_direct_messages_query do
1628 Activity
1629 |> restrict_type(%{type: "Create"})
1630 |> restrict_visibility(%{visibility: "direct"})
1631 |> order_by([activity], asc: activity.id)
1632 end
1633 end