Merge remote-tracking branch 'remotes/origin/develop' into feature/object-hashtags...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
43 {recipients, to, cc}
44 end
45
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
51 {recipients, to, cc}
52 end
53
54 defp check_actor_is_active(nil), do: true
55
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
59 _ -> false
60 end
61 end
62
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
66 end
67
68 defp check_remote_limit(_), do: true
69
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 end
73
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 end
77
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 "type" => "Create"
81 }) do
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
84 end
85 end
86
87 defp increase_replies_count_if_reply(_create_data), do: :noop
88
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
90 @impl true
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
93 {:ok, object, meta}
94 end
95 end
96
97 @impl true
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
101 {:ok, activity} <-
102 Repo.insert(%Activity{
103 data: object,
104 local: local,
105 recipients: recipients,
106 actor: object["actor"]
107 }),
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
111 end
112 end
113
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
128
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
131 end)
132
133 {:ok, activity}
134 else
135 %Activity{} = activity ->
136 {:ok, activity}
137
138 {:actor_check, _} ->
139 {:error, false}
140
141 {:containment, _} = error ->
142 error
143
144 {:error, _} = error ->
145 error
146
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
149 data: map,
150 local: local,
151 actor: map["actor"],
152 recipients: recipients,
153 id: "pleroma:fakeid"
154 }
155
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
157 {:ok, activity}
158
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
161
162 {:reject, _} = e ->
163 {:error, e}
164 end
165 end
166
167 defp insert_activity_with_expiration(data, local, recipients) do
168 struct = %Activity{
169 data: data,
170 local: local,
171 actor: data["actor"],
172 recipients: recipients
173 }
174
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
177 end
178 end
179
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
182
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
185 stream_out(activity)
186 stream_out_participations(participations)
187 end
188
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
191 ) do
192 with {:ok, _job} <-
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
196 }) do
197 {:ok, activity}
198 end
199 end
200
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
202
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
207 {:ok, conversation}
208 end
209 end
210
211 defp get_participations({:ok, conversation}) do
212 conversation
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
215 end
216
217 defp get_participations(_), do: []
218
219 def stream_out_participations(participations) do
220 participations =
221 participations
222 |> Repo.preload(:user)
223
224 Streamer.stream("participation", participations)
225 end
226
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
230
231 last_activity_id =
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
233 user: user,
234 blocking_user: user
235 })
236
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
239 end
240 end
241 end
242
243 def stream_out_participations(_, _), do: :noop
244
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
247 activity
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
250 end
251
252 def stream_out(_activity) do
253 :noop
254 end
255
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
259 result
260 end
261 end
262
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
269
270 create_data =
271 make_create_data(
272 %{to: to, actor: actor, published: published, context: context, object: object},
273 additional
274 )
275
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 else
285 {:quick_insert, true, activity} ->
286 {:ok, activity}
287
288 {:fake, true, activity} ->
289 {:ok, activity}
290
291 {:error, message} ->
292 Repo.rollback(message)
293 end
294 end
295
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
302
303 listen_data =
304 make_listen_data(
305 %{to: to, actor: actor, published: published, context: context, object: object},
306 additional
307 )
308
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 end
314 end
315
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
321 result
322 end
323 end
324
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
332 {:ok, activity}
333 else
334 nil -> nil
335 {:error, error} -> Repo.rollback(error)
336 end
337 end
338
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def flag(params) do
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
342 result
343 end
344 end
345
346 defp do_flag(
347 %{
348 actor: actor,
349 context: _context,
350 account: account,
351 statuses: statuses,
352 content: content
353 } = params
354 ) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
358
359 additional = params[:additional] || %{}
360
361 additional =
362 if forward do
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
364 else
365 Map.merge(additional, %{"to" => [], "cc" => []})
366 end
367
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
372 :ok <-
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
377 superuser
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
380 end)
381
382 {:ok, activity}
383 else
384 {:error, error} -> Repo.rollback(error)
385 end
386 end
387
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
390 params = %{
391 "type" => "Move",
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
395 }
396
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
401
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
405 })
406
407 {:ok, activity}
408 else
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
410 err -> err
411 end
412 end
413
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
416
417 recipients =
418 if opts[:user],
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
420 else: public
421
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
429 |> where(
430 [activity],
431 fragment(
432 "?->>'type' = ? and ?->>'context' = ?",
433 activity.data,
434 "Create",
435 activity.data,
436 ^context
437 )
438 )
439 |> exclude_poll_votes(opts)
440 |> exclude_id(opts)
441 |> order_by([activity], desc: activity.id)
442 end
443
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
446 context
447 |> fetch_activities_for_context_query(opts)
448 |> Repo.all()
449 end
450
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
457 |> limit(1)
458 |> select([a], a.id)
459 |> Repo.one()
460 end
461
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
465
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
470 end
471
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
474 opts
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
477 end
478
479 @valid_visibilities ~w[direct unlisted public private]
480
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
484 from(
485 a in query,
486 where:
487 fragment(
488 "activity_visibility(?, ?, ?) = ANY (?)",
489 a.actor,
490 a.recipients,
491 a.data,
492 ^visibility
493 )
494 )
495 else
496 Logger.error("Could not restrict visibility to #{visibility}")
497 end
498 end
499
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
502 from(
503 a in query,
504 where:
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 )
507 end
508
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
512 end
513
514 defp restrict_visibility(query, _visibility), do: query
515
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
519 from(
520 a in query,
521 where:
522 not fragment(
523 "activity_visibility(?, ?, ?) = ANY (?)",
524 a.actor,
525 a.recipients,
526 a.data,
527 ^visibility
528 )
529 )
530 else
531 Logger.error("Could not exclude visibility to #{visibility}")
532 query
533 end
534 end
535
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
538 from(
539 a in query,
540 where:
541 not fragment(
542 "activity_visibility(?, ?, ?) = ?",
543 a.actor,
544 a.recipients,
545 a.data,
546 ^visibility
547 )
548 )
549 end
550
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
554 query
555 end
556
557 defp exclude_visibility(query, _visibility), do: query
558
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
563 do: query
564
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
566 from(
567 a in query,
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 )
570 end
571
572 defp restrict_thread_visibility(query, _, _), do: query
573
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
575 params =
576 params
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579
580 %{
581 godmode: params[:godmode],
582 reading_user: reading_user
583 }
584 |> user_activities_recipients()
585 |> fetch_activities(params)
586 |> Enum.reverse()
587 end
588
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
590 params =
591 params
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
596
597 params =
598 if User.blocks?(reading_user, user) do
599 params
600 else
601 params
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
604 end
605
606 %{
607 godmode: params[:godmode],
608 reading_user: reading_user
609 }
610 |> user_activities_recipients()
611 |> fetch_activities(params)
612 |> Enum.reverse()
613 end
614
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
617
618 %{
619 godmode: params[:godmode],
620 reading_user: reading_user
621 }
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
624 |> Enum.reverse()
625 end
626
627 defp user_activities_recipients(%{godmode: true}), do: []
628
629 defp user_activities_recipients(%{reading_user: reading_user}) do
630 if reading_user do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
632 else
633 [Constants.as_public()]
634 end
635 end
636
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
639 end
640
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
642 from(
643 [activity, object] in query,
644 where:
645 fragment(
646 "?->>'type' != ? or ?->>'actor' != ?",
647 activity.data,
648 "Announce",
649 object.data,
650 ^actor
651 )
652 )
653 end
654
655 defp restrict_announce_object_actor(query, _), do: query
656
657 defp restrict_since(query, %{since_id: ""}), do: query
658
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
661 end
662
663 defp restrict_since(query, _), do: query
664
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise_on_missing_preload()
667 end
668
669 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
670 from(
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 )
674 end
675
676 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
677 restrict_tag_reject(query, %{tag_reject: [tag_reject]})
678 end
679
680 defp restrict_tag_reject(query, _), do: query
681
682 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
683 raise_on_missing_preload()
684 end
685
686 defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
687 from(
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
690 )
691 end
692
693 defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
694 restrict_tag(query, %{tag: tag})
695 end
696
697 defp restrict_tag_all(query, _), do: query
698
699 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
700 raise_on_missing_preload()
701 end
702
703 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
704 from(
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
707 )
708 end
709
710 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
711 restrict_tag(query, %{tag: [tag]})
712 end
713
714 defp restrict_tag(query, _), do: query
715
716 defp restrict_hashtag(query, opts) do
717 [tag_any, tag_all, tag_reject] =
718 [:tag, :tag_all, :tag_reject]
719 |> Enum.map(&opts[&1])
720 |> Enum.map(&List.wrap(&1))
721
722 has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
723
724 cond do
725 !has_conditions ->
726 query
727
728 opts[:skip_preload] ->
729 raise_on_missing_preload()
730
731 true ->
732 query
733 |> group_by_all_bindings()
734 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
735 |> maybe_restrict_hashtag_any(tag_any)
736 |> maybe_restrict_hashtag_all(tag_all)
737 |> maybe_restrict_hashtag_reject_any(tag_reject)
738 end
739 end
740
741 # Groups by all bindings to allow aggregation on hashtags
742 defp group_by_all_bindings(query) do
743 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
744 cond do
745 Enum.count(query.aliases) == 4 ->
746 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
747
748 Enum.count(query.aliases) == 3 ->
749 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
750
751 Enum.count(query.aliases) == 2 ->
752 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
753
754 true ->
755 from([a, o] in query, group_by: [a.id, o.id])
756 end
757 end
758
759 defp maybe_restrict_hashtag_any(query, []) do
760 query
761 end
762
763 defp maybe_restrict_hashtag_any(query, tags) do
764 having(
765 query,
766 [hashtag: hashtag],
767 fragment("array_agg(?) && (?)", hashtag.name, ^tags)
768 )
769 end
770
771 defp maybe_restrict_hashtag_all(query, []) do
772 query
773 end
774
775 defp maybe_restrict_hashtag_all(query, tags) do
776 having(
777 query,
778 [hashtag: hashtag],
779 fragment("array_agg(?) @> (?)", hashtag.name, ^tags)
780 )
781 end
782
783 defp maybe_restrict_hashtag_reject_any(query, []) do
784 query
785 end
786
787 defp maybe_restrict_hashtag_reject_any(query, tags) do
788 having(
789 query,
790 [hashtag: hashtag],
791 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags)
792 )
793 end
794
795 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
796 raise_on_missing_preload()
797 end
798
799 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
800 query
801 |> group_by_all_bindings()
802 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
803 |> having(
804 [hashtag: hashtag],
805 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
806 )
807 end
808
809 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
810 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
811 end
812
813 defp restrict_hashtag_reject_any(query, _), do: query
814
815 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
816 raise_on_missing_preload()
817 end
818
819 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
820 Enum.reduce(
821 tags,
822 query,
823 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
824 )
825 end
826
827 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
828 restrict_hashtag_any(query, %{tag: tag})
829 end
830
831 defp restrict_hashtag_all(query, _), do: query
832
833 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
834 raise_on_missing_preload()
835 end
836
837 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
838 from(
839 [_activity, object] in query,
840 join: hashtag in assoc(object, :hashtags),
841 where: hashtag.name in ^tags
842 )
843 end
844
845 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
846 restrict_hashtag_any(query, %{tag: [tag]})
847 end
848
849 defp restrict_hashtag_any(query, _), do: query
850
851 defp raise_on_missing_preload do
852 raise "Can't use the child object without preloading!"
853 end
854
855 defp restrict_recipients(query, [], _user), do: query
856
857 defp restrict_recipients(query, recipients, nil) do
858 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
859 end
860
861 defp restrict_recipients(query, recipients, user) do
862 from(
863 activity in query,
864 where: fragment("? && ?", ^recipients, activity.recipients),
865 or_where: activity.actor == ^user.ap_id
866 )
867 end
868
869 defp restrict_local(query, %{local_only: true}) do
870 from(activity in query, where: activity.local == true)
871 end
872
873 defp restrict_local(query, _), do: query
874
875 defp restrict_actor(query, %{actor_id: actor_id}) do
876 from(activity in query, where: activity.actor == ^actor_id)
877 end
878
879 defp restrict_actor(query, _), do: query
880
881 defp restrict_type(query, %{type: type}) when is_binary(type) do
882 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
883 end
884
885 defp restrict_type(query, %{type: type}) do
886 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
887 end
888
889 defp restrict_type(query, _), do: query
890
891 defp restrict_state(query, %{state: state}) do
892 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
893 end
894
895 defp restrict_state(query, _), do: query
896
897 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
898 from(
899 [_activity, object] in query,
900 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
901 )
902 end
903
904 defp restrict_favorited_by(query, _), do: query
905
906 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
907 raise "Can't use the child object without preloading!"
908 end
909
910 defp restrict_media(query, %{only_media: true}) do
911 from(
912 [activity, object] in query,
913 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
914 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
915 )
916 end
917
918 defp restrict_media(query, _), do: query
919
920 defp restrict_replies(query, %{exclude_replies: true}) do
921 from(
922 [_activity, object] in query,
923 where: fragment("?->>'inReplyTo' is null", object.data)
924 )
925 end
926
927 defp restrict_replies(query, %{
928 reply_filtering_user: %User{} = user,
929 reply_visibility: "self"
930 }) do
931 from(
932 [activity, object] in query,
933 where:
934 fragment(
935 "?->>'inReplyTo' is null OR ? = ANY(?)",
936 object.data,
937 ^user.ap_id,
938 activity.recipients
939 )
940 )
941 end
942
943 defp restrict_replies(query, %{
944 reply_filtering_user: %User{} = user,
945 reply_visibility: "following"
946 }) do
947 from(
948 [activity, object] in query,
949 where:
950 fragment(
951 """
952 ?->>'type' != 'Create' -- This isn't a Create
953 OR ?->>'inReplyTo' is null -- this isn't a reply
954 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
955 -- unless they are the author (because authors
956 -- are also part of the recipients). This leads
957 -- to a bug that self-replies by friends won't
958 -- show up.
959 OR ? = ? -- The actor is us
960 """,
961 activity.data,
962 object.data,
963 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
964 activity.recipients,
965 activity.actor,
966 activity.actor,
967 ^user.ap_id
968 )
969 )
970 end
971
972 defp restrict_replies(query, _), do: query
973
974 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
975 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
976 end
977
978 defp restrict_reblogs(query, _), do: query
979
980 defp restrict_muted(query, %{with_muted: true}), do: query
981
982 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
983 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
984
985 query =
986 from([activity] in query,
987 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
988 where:
989 fragment(
990 "not (?->'to' \\?| ?) or ? = ?",
991 activity.data,
992 ^mutes,
993 activity.actor,
994 ^user.ap_id
995 )
996 )
997
998 unless opts[:skip_preload] do
999 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1000 else
1001 query
1002 end
1003 end
1004
1005 defp restrict_muted(query, _), do: query
1006
1007 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1008 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1009 domain_blocks = user.domain_blocks || []
1010
1011 following_ap_ids = User.get_friends_ap_ids(user)
1012
1013 query =
1014 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1015
1016 from(
1017 [activity, object: o] in query,
1018 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1019 where:
1020 fragment(
1021 "((not (? && ?)) or ? = ?)",
1022 activity.recipients,
1023 ^blocked_ap_ids,
1024 activity.actor,
1025 ^user.ap_id
1026 ),
1027 where:
1028 fragment(
1029 "recipients_contain_blocked_domains(?, ?) = false",
1030 activity.recipients,
1031 ^domain_blocks
1032 ),
1033 where:
1034 fragment(
1035 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1036 activity.data,
1037 activity.data,
1038 ^blocked_ap_ids
1039 ),
1040 where:
1041 fragment(
1042 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1043 activity.actor,
1044 ^domain_blocks,
1045 activity.actor,
1046 ^following_ap_ids
1047 ),
1048 where:
1049 fragment(
1050 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1051 o.data,
1052 ^domain_blocks,
1053 o.data,
1054 ^following_ap_ids
1055 )
1056 )
1057 end
1058
1059 defp restrict_blocked(query, _), do: query
1060
1061 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1062 from(
1063 activity in query,
1064 where:
1065 fragment(
1066 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1067 activity.data,
1068 ^[Constants.as_public()]
1069 )
1070 )
1071 end
1072
1073 defp restrict_unlisted(query, _), do: query
1074
1075 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1076 from(activity in query, where: activity.id in ^ids)
1077 end
1078
1079 defp restrict_pinned(query, _), do: query
1080
1081 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1082 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1083
1084 from(
1085 activity in query,
1086 where:
1087 fragment(
1088 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1089 activity.data,
1090 activity.actor,
1091 ^muted_reblogs
1092 )
1093 )
1094 end
1095
1096 defp restrict_muted_reblogs(query, _), do: query
1097
1098 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1099 from(
1100 activity in query,
1101 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1102 )
1103 end
1104
1105 defp restrict_instance(query, _), do: query
1106
1107 defp restrict_filtered(query, %{user: %User{} = user}) do
1108 case Filter.compose_regex(user) do
1109 nil ->
1110 query
1111
1112 regex ->
1113 from([activity, object] in query,
1114 where:
1115 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1116 activity.actor == ^user.ap_id
1117 )
1118 end
1119 end
1120
1121 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1122 restrict_filtered(query, %{user: user})
1123 end
1124
1125 defp restrict_filtered(query, _), do: query
1126
1127 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1128
1129 defp exclude_poll_votes(query, _) do
1130 if has_named_binding?(query, :object) do
1131 from([activity, object: o] in query,
1132 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1133 )
1134 else
1135 query
1136 end
1137 end
1138
1139 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1140
1141 defp exclude_chat_messages(query, _) do
1142 if has_named_binding?(query, :object) do
1143 from([activity, object: o] in query,
1144 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1145 )
1146 else
1147 query
1148 end
1149 end
1150
1151 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1152
1153 defp exclude_invisible_actors(query, _opts) do
1154 invisible_ap_ids =
1155 User.Query.build(%{invisible: true, select: [:ap_id]})
1156 |> Repo.all()
1157 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1158
1159 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1160 end
1161
1162 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1163 from(activity in query, where: activity.id != ^id)
1164 end
1165
1166 defp exclude_id(query, _), do: query
1167
1168 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1169
1170 defp maybe_preload_objects(query, _) do
1171 query
1172 |> Activity.with_preloaded_object()
1173 end
1174
1175 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1176
1177 defp maybe_preload_bookmarks(query, opts) do
1178 query
1179 |> Activity.with_preloaded_bookmark(opts[:user])
1180 end
1181
1182 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1183 query
1184 |> Activity.with_preloaded_report_notes()
1185 end
1186
1187 defp maybe_preload_report_notes(query, _), do: query
1188
1189 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1190
1191 defp maybe_set_thread_muted_field(query, opts) do
1192 query
1193 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1194 end
1195
1196 defp maybe_order(query, %{order: :desc}) do
1197 query
1198 |> order_by(desc: :id)
1199 end
1200
1201 defp maybe_order(query, %{order: :asc}) do
1202 query
1203 |> order_by(asc: :id)
1204 end
1205
1206 defp maybe_order(query, _), do: query
1207
1208 defp fetch_activities_query_ap_ids_ops(opts) do
1209 source_user = opts[:muting_user]
1210 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1211
1212 ap_id_relationships =
1213 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1214 [:block | ap_id_relationships]
1215 else
1216 ap_id_relationships
1217 end
1218
1219 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1220
1221 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1222 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1223
1224 restrict_muted_reblogs_opts =
1225 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1226
1227 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1228 end
1229
1230 def fetch_activities_query(recipients, opts \\ %{}) do
1231 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1232 fetch_activities_query_ap_ids_ops(opts)
1233
1234 config = %{
1235 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1236 }
1237
1238 query =
1239 Activity
1240 |> maybe_preload_objects(opts)
1241 |> maybe_preload_bookmarks(opts)
1242 |> maybe_preload_report_notes(opts)
1243 |> maybe_set_thread_muted_field(opts)
1244 |> maybe_order(opts)
1245 |> restrict_recipients(recipients, opts[:user])
1246 |> restrict_replies(opts)
1247 |> restrict_since(opts)
1248 |> restrict_local(opts)
1249 |> restrict_actor(opts)
1250 |> restrict_type(opts)
1251 |> restrict_state(opts)
1252 |> restrict_favorited_by(opts)
1253 |> restrict_blocked(restrict_blocked_opts)
1254 |> restrict_muted(restrict_muted_opts)
1255 |> restrict_filtered(opts)
1256 |> restrict_media(opts)
1257 |> restrict_visibility(opts)
1258 |> restrict_thread_visibility(opts, config)
1259 |> restrict_reblogs(opts)
1260 |> restrict_pinned(opts)
1261 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1262 |> restrict_instance(opts)
1263 |> restrict_announce_object_actor(opts)
1264 |> restrict_filtered(opts)
1265 |> Activity.restrict_deactivated_users()
1266 |> exclude_poll_votes(opts)
1267 |> exclude_chat_messages(opts)
1268 |> exclude_invisible_actors(opts)
1269 |> exclude_visibility(opts)
1270
1271 cond do
1272 Config.object_embedded_hashtags?() ->
1273 query
1274 |> restrict_tag(opts)
1275 |> restrict_tag_reject(opts)
1276 |> restrict_tag_all(opts)
1277
1278 # TODO: benchmark (initial approach preferring non-aggregate ops when possible)
1279 Config.get([:instance, :improved_hashtag_timeline]) == :join ->
1280 query
1281 |> distinct([activity], true)
1282 |> restrict_hashtag_any(opts)
1283 |> restrict_hashtag_all(opts)
1284 |> restrict_hashtag_reject_any(opts)
1285
1286 true ->
1287 restrict_hashtag(query, opts)
1288 end
1289 end
1290
1291 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1292 list_memberships = Pleroma.List.memberships(opts[:user])
1293
1294 fetch_activities_query(recipients ++ list_memberships, opts)
1295 |> Pagination.fetch_paginated(opts, pagination)
1296 |> Enum.reverse()
1297 |> maybe_update_cc(list_memberships, opts[:user])
1298 end
1299
1300 @doc """
1301 Fetch favorites activities of user with order by sort adds to favorites
1302 """
1303 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1304 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1305 user.ap_id
1306 |> Activity.Queries.by_actor()
1307 |> Activity.Queries.by_type("Like")
1308 |> Activity.with_joined_object()
1309 |> Object.with_joined_activity()
1310 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1311 |> order_by([like, _, _], desc_nulls_last: like.id)
1312 |> Pagination.fetch_paginated(
1313 Map.merge(params, %{skip_order: true}),
1314 pagination
1315 )
1316 end
1317
1318 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1319 Enum.map(activities, fn
1320 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1321 if Enum.any?(bcc, &(&1 in list_memberships)) do
1322 update_in(activity.data["cc"], &[user_ap_id | &1])
1323 else
1324 activity
1325 end
1326
1327 activity ->
1328 activity
1329 end)
1330 end
1331
1332 defp maybe_update_cc(activities, _, _), do: activities
1333
1334 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1335 from(activity in query,
1336 where:
1337 fragment("? && ?", activity.recipients, ^recipients) or
1338 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1339 ^Constants.as_public() in activity.recipients)
1340 )
1341 end
1342
1343 def fetch_activities_bounded(
1344 recipients,
1345 recipients_with_public,
1346 opts \\ %{},
1347 pagination \\ :keyset
1348 ) do
1349 fetch_activities_query([], opts)
1350 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1351 |> Pagination.fetch_paginated(opts, pagination)
1352 |> Enum.reverse()
1353 end
1354
1355 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1356 def upload(file, opts \\ []) do
1357 with {:ok, data} <- Upload.store(file, opts) do
1358 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1359
1360 Repo.insert(%Object{data: obj_data})
1361 end
1362 end
1363
1364 @spec get_actor_url(any()) :: binary() | nil
1365 defp get_actor_url(url) when is_binary(url), do: url
1366 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1367
1368 defp get_actor_url(url) when is_list(url) do
1369 url
1370 |> List.first()
1371 |> get_actor_url()
1372 end
1373
1374 defp get_actor_url(_url), do: nil
1375
1376 defp object_to_user_data(data) do
1377 avatar =
1378 data["icon"]["url"] &&
1379 %{
1380 "type" => "Image",
1381 "url" => [%{"href" => data["icon"]["url"]}]
1382 }
1383
1384 banner =
1385 data["image"]["url"] &&
1386 %{
1387 "type" => "Image",
1388 "url" => [%{"href" => data["image"]["url"]}]
1389 }
1390
1391 fields =
1392 data
1393 |> Map.get("attachment", [])
1394 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1395 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1396
1397 emojis =
1398 data
1399 |> Map.get("tag", [])
1400 |> Enum.filter(fn
1401 %{"type" => "Emoji"} -> true
1402 _ -> false
1403 end)
1404 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1405 {String.trim(name, ":"), url}
1406 end)
1407
1408 is_locked = data["manuallyApprovesFollowers"] || false
1409 capabilities = data["capabilities"] || %{}
1410 accepts_chat_messages = capabilities["acceptsChatMessages"]
1411 data = Transmogrifier.maybe_fix_user_object(data)
1412 is_discoverable = data["discoverable"] || false
1413 invisible = data["invisible"] || false
1414 actor_type = data["type"] || "Person"
1415
1416 public_key =
1417 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1418 data["publicKey"]["publicKeyPem"]
1419 else
1420 nil
1421 end
1422
1423 shared_inbox =
1424 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1425 data["endpoints"]["sharedInbox"]
1426 else
1427 nil
1428 end
1429
1430 user_data = %{
1431 ap_id: data["id"],
1432 uri: get_actor_url(data["url"]),
1433 ap_enabled: true,
1434 banner: banner,
1435 fields: fields,
1436 emoji: emojis,
1437 is_locked: is_locked,
1438 is_discoverable: is_discoverable,
1439 invisible: invisible,
1440 avatar: avatar,
1441 name: data["name"],
1442 follower_address: data["followers"],
1443 following_address: data["following"],
1444 bio: data["summary"] || "",
1445 actor_type: actor_type,
1446 also_known_as: Map.get(data, "alsoKnownAs", []),
1447 public_key: public_key,
1448 inbox: data["inbox"],
1449 shared_inbox: shared_inbox,
1450 accepts_chat_messages: accepts_chat_messages
1451 }
1452
1453 # nickname can be nil because of virtual actors
1454 if data["preferredUsername"] do
1455 Map.put(
1456 user_data,
1457 :nickname,
1458 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1459 )
1460 else
1461 Map.put(user_data, :nickname, nil)
1462 end
1463 end
1464
1465 def fetch_follow_information_for_user(user) do
1466 with {:ok, following_data} <-
1467 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1468 {:ok, hide_follows} <- collection_private(following_data),
1469 {:ok, followers_data} <-
1470 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1471 {:ok, hide_followers} <- collection_private(followers_data) do
1472 {:ok,
1473 %{
1474 hide_follows: hide_follows,
1475 follower_count: normalize_counter(followers_data["totalItems"]),
1476 following_count: normalize_counter(following_data["totalItems"]),
1477 hide_followers: hide_followers
1478 }}
1479 else
1480 {:error, _} = e -> e
1481 e -> {:error, e}
1482 end
1483 end
1484
1485 defp normalize_counter(counter) when is_integer(counter), do: counter
1486 defp normalize_counter(_), do: 0
1487
1488 def maybe_update_follow_information(user_data) do
1489 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1490 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1491 {_, true} <-
1492 {:collections_available,
1493 !!(user_data[:following_address] && user_data[:follower_address])},
1494 {:ok, info} <-
1495 fetch_follow_information_for_user(user_data) do
1496 info = Map.merge(user_data[:info] || %{}, info)
1497
1498 user_data
1499 |> Map.put(:info, info)
1500 else
1501 {:user_type_check, false} ->
1502 user_data
1503
1504 {:collections_available, false} ->
1505 user_data
1506
1507 {:enabled, false} ->
1508 user_data
1509
1510 e ->
1511 Logger.error(
1512 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1513 )
1514
1515 user_data
1516 end
1517 end
1518
1519 defp collection_private(%{"first" => %{"type" => type}})
1520 when type in ["CollectionPage", "OrderedCollectionPage"],
1521 do: {:ok, false}
1522
1523 defp collection_private(%{"first" => first}) do
1524 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1525 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1526 {:ok, false}
1527 else
1528 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1529 {:error, _} = e -> e
1530 e -> {:error, e}
1531 end
1532 end
1533
1534 defp collection_private(_data), do: {:ok, true}
1535
1536 def user_data_from_user_object(data) do
1537 with {:ok, data} <- MRF.filter(data) do
1538 {:ok, object_to_user_data(data)}
1539 else
1540 e -> {:error, e}
1541 end
1542 end
1543
1544 def fetch_and_prepare_user_from_ap_id(ap_id) do
1545 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1546 {:ok, data} <- user_data_from_user_object(data) do
1547 {:ok, maybe_update_follow_information(data)}
1548 else
1549 # If this has been deleted, only log a debug and not an error
1550 {:error, "Object has been deleted" = e} ->
1551 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1552 {:error, e}
1553
1554 {:error, {:reject, reason} = e} ->
1555 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1556 {:error, e}
1557
1558 {:error, e} ->
1559 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1560 {:error, e}
1561 end
1562 end
1563
1564 def maybe_handle_clashing_nickname(data) do
1565 with nickname when is_binary(nickname) <- data[:nickname],
1566 %User{} = old_user <- User.get_by_nickname(nickname),
1567 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1568 Logger.info(
1569 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1570 data[:ap_id]
1571 }, renaming."
1572 )
1573
1574 old_user
1575 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1576 |> User.update_and_set_cache()
1577 else
1578 {:ap_id_comparison, true} ->
1579 Logger.info(
1580 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1581 )
1582
1583 _ ->
1584 nil
1585 end
1586 end
1587
1588 def make_user_from_ap_id(ap_id) do
1589 user = User.get_cached_by_ap_id(ap_id)
1590
1591 if user && !User.ap_enabled?(user) do
1592 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1593 else
1594 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1595 if user do
1596 user
1597 |> User.remote_user_changeset(data)
1598 |> User.update_and_set_cache()
1599 else
1600 maybe_handle_clashing_nickname(data)
1601
1602 data
1603 |> User.remote_user_changeset()
1604 |> Repo.insert()
1605 |> User.set_cache()
1606 end
1607 end
1608 end
1609 end
1610
1611 def make_user_from_nickname(nickname) do
1612 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1613 make_user_from_ap_id(ap_id)
1614 else
1615 _e -> {:error, "No AP id in WebFinger"}
1616 end
1617 end
1618
1619 # filter out broken threads
1620 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1621 entire_thread_visible_for_user?(activity, user)
1622 end
1623
1624 # do post-processing on a specific activity
1625 def contain_activity(%Activity{} = activity, %User{} = user) do
1626 contain_broken_threads(activity, user)
1627 end
1628
1629 def fetch_direct_messages_query do
1630 Activity
1631 |> restrict_type(%{type: "Create"})
1632 |> restrict_visibility(%{visibility: "direct"})
1633 |> order_by([activity], asc: activity.id)
1634 end
1635 end