f5563b0fde7fe121145e7c0401df5e0a39a0c2ed
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{deactivated: deactivated} -> not deactivated
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
673 raise_on_missing_preload()
674 end
675
676 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
677 from(
678 [_activity, object] in query,
679 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
680 )
681 end
682
683 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
684 restrict_tag_reject(query, %{tag_reject: [tag_reject]})
685 end
686
687 defp restrict_tag_reject(query, _), do: query
688
689 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
690 raise_on_missing_preload()
691 end
692
693 defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
694 from(
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
697 )
698 end
699
700 defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
701 restrict_tag(query, %{tag: tag})
702 end
703
704 defp restrict_tag_all(query, _), do: query
705
706 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
707 raise_on_missing_preload()
708 end
709
710 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
711 from(
712 [_activity, object] in query,
713 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
714 )
715 end
716
717 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
718 restrict_tag(query, %{tag: [tag]})
719 end
720
721 defp restrict_tag(query, _), do: query
722
723 defp restrict_hashtag(query, opts) do
724 [tag_any, tag_all, tag_reject] =
725 [:tag, :tag_all, :tag_reject]
726 |> Enum.map(&opts[&1])
727 |> Enum.map(&List.wrap(&1))
728
729 has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
730
731 cond do
732 !has_conditions ->
733 query
734
735 opts[:skip_preload] ->
736 raise_on_missing_preload()
737
738 true ->
739 query
740 |> group_by_all_bindings()
741 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
742 |> maybe_restrict_hashtag_any(tag_any)
743 |> maybe_restrict_hashtag_all(tag_all)
744 |> maybe_restrict_hashtag_reject_any(tag_reject)
745 end
746 end
747
748 # Groups by all bindings to allow aggregation on hashtags
749 defp group_by_all_bindings(query) do
750 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
751 cond do
752 Enum.count(query.aliases) == 4 ->
753 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
754
755 Enum.count(query.aliases) == 3 ->
756 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
757
758 Enum.count(query.aliases) == 2 ->
759 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
760
761 true ->
762 from([a, o] in query, group_by: [a.id, o.id])
763 end
764 end
765
766 defp maybe_restrict_hashtag_any(query, []) do
767 query
768 end
769
770 defp maybe_restrict_hashtag_any(query, tags) do
771 having(
772 query,
773 [hashtag: hashtag],
774 fragment("array_agg(?) && (?)", hashtag.name, ^tags)
775 )
776 end
777
778 defp maybe_restrict_hashtag_all(query, []) do
779 query
780 end
781
782 defp maybe_restrict_hashtag_all(query, tags) do
783 having(
784 query,
785 [hashtag: hashtag],
786 fragment("array_agg(?) @> (?)", hashtag.name, ^tags)
787 )
788 end
789
790 defp maybe_restrict_hashtag_reject_any(query, []) do
791 query
792 end
793
794 defp maybe_restrict_hashtag_reject_any(query, tags) do
795 having(
796 query,
797 [hashtag: hashtag],
798 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags)
799 )
800 end
801
802 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
803 raise_on_missing_preload()
804 end
805
806 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
807 query
808 |> group_by_all_bindings()
809 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
810 |> having(
811 [hashtag: hashtag],
812 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
813 )
814 end
815
816 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
817 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
818 end
819
820 defp restrict_hashtag_reject_any(query, _), do: query
821
822 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
823 raise_on_missing_preload()
824 end
825
826 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
827 Enum.reduce(
828 tags,
829 query,
830 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
831 )
832 end
833
834 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
835 restrict_hashtag_any(query, %{tag: tag})
836 end
837
838 defp restrict_hashtag_all(query, _), do: query
839
840 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
841 raise_on_missing_preload()
842 end
843
844 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
845 from(
846 [_activity, object] in query,
847 join: hashtag in assoc(object, :hashtags),
848 where: hashtag.name in ^tags
849 )
850 end
851
852 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
853 restrict_hashtag_any(query, %{tag: [tag]})
854 end
855
856 defp restrict_hashtag_any(query, _), do: query
857
858 defp raise_on_missing_preload do
859 raise "Can't use the child object without preloading!"
860 end
861
862 defp restrict_recipients(query, [], _user), do: query
863
864 defp restrict_recipients(query, recipients, nil) do
865 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
866 end
867
868 defp restrict_recipients(query, recipients, user) do
869 from(
870 activity in query,
871 where: fragment("? && ?", ^recipients, activity.recipients),
872 or_where: activity.actor == ^user.ap_id
873 )
874 end
875
876 defp restrict_local(query, %{local_only: true}) do
877 from(activity in query, where: activity.local == true)
878 end
879
880 defp restrict_local(query, _), do: query
881
882 defp restrict_actor(query, %{actor_id: actor_id}) do
883 from(activity in query, where: activity.actor == ^actor_id)
884 end
885
886 defp restrict_actor(query, _), do: query
887
888 defp restrict_type(query, %{type: type}) when is_binary(type) do
889 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
890 end
891
892 defp restrict_type(query, %{type: type}) do
893 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
894 end
895
896 defp restrict_type(query, _), do: query
897
898 defp restrict_state(query, %{state: state}) do
899 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
900 end
901
902 defp restrict_state(query, _), do: query
903
904 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
905 from(
906 [_activity, object] in query,
907 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
908 )
909 end
910
911 defp restrict_favorited_by(query, _), do: query
912
913 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
914 raise "Can't use the child object without preloading!"
915 end
916
917 defp restrict_media(query, %{only_media: true}) do
918 from(
919 [activity, object] in query,
920 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
921 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
922 )
923 end
924
925 defp restrict_media(query, _), do: query
926
927 defp restrict_replies(query, %{exclude_replies: true}) do
928 from(
929 [_activity, object] in query,
930 where: fragment("?->>'inReplyTo' is null", object.data)
931 )
932 end
933
934 defp restrict_replies(query, %{
935 reply_filtering_user: %User{} = user,
936 reply_visibility: "self"
937 }) do
938 from(
939 [activity, object] in query,
940 where:
941 fragment(
942 "?->>'inReplyTo' is null OR ? = ANY(?)",
943 object.data,
944 ^user.ap_id,
945 activity.recipients
946 )
947 )
948 end
949
950 defp restrict_replies(query, %{
951 reply_filtering_user: %User{} = user,
952 reply_visibility: "following"
953 }) do
954 from(
955 [activity, object] in query,
956 where:
957 fragment(
958 """
959 ?->>'type' != 'Create' -- This isn't a Create
960 OR ?->>'inReplyTo' is null -- this isn't a reply
961 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
962 -- unless they are the author (because authors
963 -- are also part of the recipients). This leads
964 -- to a bug that self-replies by friends won't
965 -- show up.
966 OR ? = ? -- The actor is us
967 """,
968 activity.data,
969 object.data,
970 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
971 activity.recipients,
972 activity.actor,
973 activity.actor,
974 ^user.ap_id
975 )
976 )
977 end
978
979 defp restrict_replies(query, _), do: query
980
981 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
982 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
983 end
984
985 defp restrict_reblogs(query, _), do: query
986
987 defp restrict_muted(query, %{with_muted: true}), do: query
988
989 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
990 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
991
992 query =
993 from([activity] in query,
994 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
995 where:
996 fragment(
997 "not (?->'to' \\?| ?) or ? = ?",
998 activity.data,
999 ^mutes,
1000 activity.actor,
1001 ^user.ap_id
1002 )
1003 )
1004
1005 unless opts[:skip_preload] do
1006 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1007 else
1008 query
1009 end
1010 end
1011
1012 defp restrict_muted(query, _), do: query
1013
1014 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1015 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1016 domain_blocks = user.domain_blocks || []
1017
1018 following_ap_ids = User.get_friends_ap_ids(user)
1019
1020 query =
1021 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1022
1023 from(
1024 [activity, object: o] in query,
1025 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1026 where:
1027 fragment(
1028 "((not (? && ?)) or ? = ?)",
1029 activity.recipients,
1030 ^blocked_ap_ids,
1031 activity.actor,
1032 ^user.ap_id
1033 ),
1034 where:
1035 fragment(
1036 "recipients_contain_blocked_domains(?, ?) = false",
1037 activity.recipients,
1038 ^domain_blocks
1039 ),
1040 where:
1041 fragment(
1042 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1043 activity.data,
1044 activity.data,
1045 ^blocked_ap_ids
1046 ),
1047 where:
1048 fragment(
1049 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1050 activity.actor,
1051 ^domain_blocks,
1052 activity.actor,
1053 ^following_ap_ids
1054 ),
1055 where:
1056 fragment(
1057 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1058 o.data,
1059 ^domain_blocks,
1060 o.data,
1061 ^following_ap_ids
1062 )
1063 )
1064 end
1065
1066 defp restrict_blocked(query, _), do: query
1067
1068 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1069 from(
1070 activity in query,
1071 where:
1072 fragment(
1073 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1074 activity.data,
1075 ^[Constants.as_public()]
1076 )
1077 )
1078 end
1079
1080 defp restrict_unlisted(query, _), do: query
1081
1082 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1083 from(activity in query, where: activity.id in ^ids)
1084 end
1085
1086 defp restrict_pinned(query, _), do: query
1087
1088 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1089 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1090
1091 from(
1092 activity in query,
1093 where:
1094 fragment(
1095 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1096 activity.data,
1097 activity.actor,
1098 ^muted_reblogs
1099 )
1100 )
1101 end
1102
1103 defp restrict_muted_reblogs(query, _), do: query
1104
1105 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1106 from(
1107 activity in query,
1108 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1109 )
1110 end
1111
1112 defp restrict_instance(query, _), do: query
1113
1114 defp restrict_filtered(query, %{user: %User{} = user}) do
1115 case Filter.compose_regex(user) do
1116 nil ->
1117 query
1118
1119 regex ->
1120 from([activity, object] in query,
1121 where:
1122 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1123 activity.actor == ^user.ap_id
1124 )
1125 end
1126 end
1127
1128 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1129 restrict_filtered(query, %{user: user})
1130 end
1131
1132 defp restrict_filtered(query, _), do: query
1133
1134 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1135
1136 defp exclude_poll_votes(query, _) do
1137 if has_named_binding?(query, :object) do
1138 from([activity, object: o] in query,
1139 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1140 )
1141 else
1142 query
1143 end
1144 end
1145
1146 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1147
1148 defp exclude_chat_messages(query, _) do
1149 if has_named_binding?(query, :object) do
1150 from([activity, object: o] in query,
1151 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1152 )
1153 else
1154 query
1155 end
1156 end
1157
1158 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1159
1160 defp exclude_invisible_actors(query, _opts) do
1161 invisible_ap_ids =
1162 User.Query.build(%{invisible: true, select: [:ap_id]})
1163 |> Repo.all()
1164 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1165
1166 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1167 end
1168
1169 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1170 from(activity in query, where: activity.id != ^id)
1171 end
1172
1173 defp exclude_id(query, _), do: query
1174
1175 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1176
1177 defp maybe_preload_objects(query, _) do
1178 query
1179 |> Activity.with_preloaded_object()
1180 end
1181
1182 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1183
1184 defp maybe_preload_bookmarks(query, opts) do
1185 query
1186 |> Activity.with_preloaded_bookmark(opts[:user])
1187 end
1188
1189 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1190 query
1191 |> Activity.with_preloaded_report_notes()
1192 end
1193
1194 defp maybe_preload_report_notes(query, _), do: query
1195
1196 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1197
1198 defp maybe_set_thread_muted_field(query, opts) do
1199 query
1200 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1201 end
1202
1203 defp maybe_order(query, %{order: :desc}) do
1204 query
1205 |> order_by(desc: :id)
1206 end
1207
1208 defp maybe_order(query, %{order: :asc}) do
1209 query
1210 |> order_by(asc: :id)
1211 end
1212
1213 defp maybe_order(query, _), do: query
1214
1215 defp fetch_activities_query_ap_ids_ops(opts) do
1216 source_user = opts[:muting_user]
1217 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1218
1219 ap_id_relationships =
1220 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1221 [:block | ap_id_relationships]
1222 else
1223 ap_id_relationships
1224 end
1225
1226 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1227
1228 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1229 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1230
1231 restrict_muted_reblogs_opts =
1232 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1233
1234 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1235 end
1236
1237 def fetch_activities_query(recipients, opts \\ %{}) do
1238 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1239 fetch_activities_query_ap_ids_ops(opts)
1240
1241 config = %{
1242 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1243 }
1244
1245 query =
1246 Activity
1247 |> maybe_preload_objects(opts)
1248 |> maybe_preload_bookmarks(opts)
1249 |> maybe_preload_report_notes(opts)
1250 |> maybe_set_thread_muted_field(opts)
1251 |> maybe_order(opts)
1252 |> restrict_recipients(recipients, opts[:user])
1253 |> restrict_replies(opts)
1254 |> restrict_since(opts)
1255 |> restrict_local(opts)
1256 |> restrict_actor(opts)
1257 |> restrict_type(opts)
1258 |> restrict_state(opts)
1259 |> restrict_favorited_by(opts)
1260 |> restrict_blocked(restrict_blocked_opts)
1261 |> restrict_muted(restrict_muted_opts)
1262 |> restrict_filtered(opts)
1263 |> restrict_media(opts)
1264 |> restrict_visibility(opts)
1265 |> restrict_thread_visibility(opts, config)
1266 |> restrict_reblogs(opts)
1267 |> restrict_pinned(opts)
1268 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1269 |> restrict_instance(opts)
1270 |> restrict_announce_object_actor(opts)
1271 |> restrict_filtered(opts)
1272 |> Activity.restrict_deactivated_users()
1273 |> exclude_poll_votes(opts)
1274 |> exclude_chat_messages(opts)
1275 |> exclude_invisible_actors(opts)
1276 |> exclude_visibility(opts)
1277
1278 cond do
1279 Config.object_embedded_hashtags?() ->
1280 query
1281 |> restrict_tag(opts)
1282 |> restrict_tag_reject(opts)
1283 |> restrict_tag_all(opts)
1284
1285 # TODO: benchmark (initial approach preferring non-aggregate ops when possible)
1286 Config.improved_hashtag_timeline() == :join ->
1287 query
1288 |> distinct([activity], true)
1289 |> restrict_hashtag_any(opts)
1290 |> restrict_hashtag_all(opts)
1291 |> restrict_hashtag_reject_any(opts)
1292
1293 true ->
1294 restrict_hashtag(query, opts)
1295 end
1296 end
1297
1298 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1299 list_memberships = Pleroma.List.memberships(opts[:user])
1300
1301 fetch_activities_query(recipients ++ list_memberships, opts)
1302 |> Pagination.fetch_paginated(opts, pagination)
1303 |> Enum.reverse()
1304 |> maybe_update_cc(list_memberships, opts[:user])
1305 end
1306
1307 @doc """
1308 Fetch favorites activities of user with order by sort adds to favorites
1309 """
1310 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1311 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1312 user.ap_id
1313 |> Activity.Queries.by_actor()
1314 |> Activity.Queries.by_type("Like")
1315 |> Activity.with_joined_object()
1316 |> Object.with_joined_activity()
1317 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1318 |> order_by([like, _, _], desc_nulls_last: like.id)
1319 |> Pagination.fetch_paginated(
1320 Map.merge(params, %{skip_order: true}),
1321 pagination
1322 )
1323 end
1324
1325 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1326 Enum.map(activities, fn
1327 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1328 if Enum.any?(bcc, &(&1 in list_memberships)) do
1329 update_in(activity.data["cc"], &[user_ap_id | &1])
1330 else
1331 activity
1332 end
1333
1334 activity ->
1335 activity
1336 end)
1337 end
1338
1339 defp maybe_update_cc(activities, _, _), do: activities
1340
1341 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1342 from(activity in query,
1343 where:
1344 fragment("? && ?", activity.recipients, ^recipients) or
1345 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1346 ^Constants.as_public() in activity.recipients)
1347 )
1348 end
1349
1350 def fetch_activities_bounded(
1351 recipients,
1352 recipients_with_public,
1353 opts \\ %{},
1354 pagination \\ :keyset
1355 ) do
1356 fetch_activities_query([], opts)
1357 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1358 |> Pagination.fetch_paginated(opts, pagination)
1359 |> Enum.reverse()
1360 end
1361
1362 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1363 def upload(file, opts \\ []) do
1364 with {:ok, data} <- Upload.store(file, opts) do
1365 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1366
1367 Repo.insert(%Object{data: obj_data})
1368 end
1369 end
1370
1371 @spec get_actor_url(any()) :: binary() | nil
1372 defp get_actor_url(url) when is_binary(url), do: url
1373 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1374
1375 defp get_actor_url(url) when is_list(url) do
1376 url
1377 |> List.first()
1378 |> get_actor_url()
1379 end
1380
1381 defp get_actor_url(_url), do: nil
1382
1383 defp object_to_user_data(data) do
1384 avatar =
1385 data["icon"]["url"] &&
1386 %{
1387 "type" => "Image",
1388 "url" => [%{"href" => data["icon"]["url"]}]
1389 }
1390
1391 banner =
1392 data["image"]["url"] &&
1393 %{
1394 "type" => "Image",
1395 "url" => [%{"href" => data["image"]["url"]}]
1396 }
1397
1398 fields =
1399 data
1400 |> Map.get("attachment", [])
1401 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1402 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1403
1404 emojis =
1405 data
1406 |> Map.get("tag", [])
1407 |> Enum.filter(fn
1408 %{"type" => "Emoji"} -> true
1409 _ -> false
1410 end)
1411 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1412 {String.trim(name, ":"), url}
1413 end)
1414
1415 is_locked = data["manuallyApprovesFollowers"] || false
1416 capabilities = data["capabilities"] || %{}
1417 accepts_chat_messages = capabilities["acceptsChatMessages"]
1418 data = Transmogrifier.maybe_fix_user_object(data)
1419 is_discoverable = data["discoverable"] || false
1420 invisible = data["invisible"] || false
1421 actor_type = data["type"] || "Person"
1422
1423 public_key =
1424 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1425 data["publicKey"]["publicKeyPem"]
1426 else
1427 nil
1428 end
1429
1430 shared_inbox =
1431 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1432 data["endpoints"]["sharedInbox"]
1433 else
1434 nil
1435 end
1436
1437 user_data = %{
1438 ap_id: data["id"],
1439 uri: get_actor_url(data["url"]),
1440 ap_enabled: true,
1441 banner: banner,
1442 fields: fields,
1443 emoji: emojis,
1444 is_locked: is_locked,
1445 is_discoverable: is_discoverable,
1446 invisible: invisible,
1447 avatar: avatar,
1448 name: data["name"],
1449 follower_address: data["followers"],
1450 following_address: data["following"],
1451 bio: data["summary"] || "",
1452 actor_type: actor_type,
1453 also_known_as: Map.get(data, "alsoKnownAs", []),
1454 public_key: public_key,
1455 inbox: data["inbox"],
1456 shared_inbox: shared_inbox,
1457 accepts_chat_messages: accepts_chat_messages
1458 }
1459
1460 # nickname can be nil because of virtual actors
1461 if data["preferredUsername"] do
1462 Map.put(
1463 user_data,
1464 :nickname,
1465 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1466 )
1467 else
1468 Map.put(user_data, :nickname, nil)
1469 end
1470 end
1471
1472 def fetch_follow_information_for_user(user) do
1473 with {:ok, following_data} <-
1474 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1475 {:ok, hide_follows} <- collection_private(following_data),
1476 {:ok, followers_data} <-
1477 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1478 {:ok, hide_followers} <- collection_private(followers_data) do
1479 {:ok,
1480 %{
1481 hide_follows: hide_follows,
1482 follower_count: normalize_counter(followers_data["totalItems"]),
1483 following_count: normalize_counter(following_data["totalItems"]),
1484 hide_followers: hide_followers
1485 }}
1486 else
1487 {:error, _} = e -> e
1488 e -> {:error, e}
1489 end
1490 end
1491
1492 defp normalize_counter(counter) when is_integer(counter), do: counter
1493 defp normalize_counter(_), do: 0
1494
1495 def maybe_update_follow_information(user_data) do
1496 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1497 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1498 {_, true} <-
1499 {:collections_available,
1500 !!(user_data[:following_address] && user_data[:follower_address])},
1501 {:ok, info} <-
1502 fetch_follow_information_for_user(user_data) do
1503 info = Map.merge(user_data[:info] || %{}, info)
1504
1505 user_data
1506 |> Map.put(:info, info)
1507 else
1508 {:user_type_check, false} ->
1509 user_data
1510
1511 {:collections_available, false} ->
1512 user_data
1513
1514 {:enabled, false} ->
1515 user_data
1516
1517 e ->
1518 Logger.error(
1519 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1520 )
1521
1522 user_data
1523 end
1524 end
1525
1526 defp collection_private(%{"first" => %{"type" => type}})
1527 when type in ["CollectionPage", "OrderedCollectionPage"],
1528 do: {:ok, false}
1529
1530 defp collection_private(%{"first" => first}) do
1531 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1532 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1533 {:ok, false}
1534 else
1535 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1536 {:error, _} = e -> e
1537 e -> {:error, e}
1538 end
1539 end
1540
1541 defp collection_private(_data), do: {:ok, true}
1542
1543 def user_data_from_user_object(data) do
1544 with {:ok, data} <- MRF.filter(data) do
1545 {:ok, object_to_user_data(data)}
1546 else
1547 e -> {:error, e}
1548 end
1549 end
1550
1551 def fetch_and_prepare_user_from_ap_id(ap_id) do
1552 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1553 {:ok, data} <- user_data_from_user_object(data) do
1554 {:ok, maybe_update_follow_information(data)}
1555 else
1556 # If this has been deleted, only log a debug and not an error
1557 {:error, "Object has been deleted" = e} ->
1558 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1559 {:error, e}
1560
1561 {:error, {:reject, reason} = e} ->
1562 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1563 {:error, e}
1564
1565 {:error, e} ->
1566 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1567 {:error, e}
1568 end
1569 end
1570
1571 def maybe_handle_clashing_nickname(data) do
1572 with nickname when is_binary(nickname) <- data[:nickname],
1573 %User{} = old_user <- User.get_by_nickname(nickname),
1574 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1575 Logger.info(
1576 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1577 data[:ap_id]
1578 }, renaming."
1579 )
1580
1581 old_user
1582 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1583 |> User.update_and_set_cache()
1584 else
1585 {:ap_id_comparison, true} ->
1586 Logger.info(
1587 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1588 )
1589
1590 _ ->
1591 nil
1592 end
1593 end
1594
1595 def make_user_from_ap_id(ap_id) do
1596 user = User.get_cached_by_ap_id(ap_id)
1597
1598 if user && !User.ap_enabled?(user) do
1599 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1600 else
1601 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1602 if user do
1603 user
1604 |> User.remote_user_changeset(data)
1605 |> User.update_and_set_cache()
1606 else
1607 maybe_handle_clashing_nickname(data)
1608
1609 data
1610 |> User.remote_user_changeset()
1611 |> Repo.insert()
1612 |> User.set_cache()
1613 end
1614 end
1615 end
1616 end
1617
1618 def make_user_from_nickname(nickname) do
1619 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1620 make_user_from_ap_id(ap_id)
1621 else
1622 _e -> {:error, "No AP id in WebFinger"}
1623 end
1624 end
1625
1626 # filter out broken threads
1627 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1628 entire_thread_visible_for_user?(activity, user)
1629 end
1630
1631 # do post-processing on a specific activity
1632 def contain_activity(%Activity{} = activity, %User{} = user) do
1633 contain_broken_threads(activity, user)
1634 end
1635
1636 def fetch_direct_messages_query do
1637 Activity
1638 |> restrict_type(%{type: "Create"})
1639 |> restrict_visibility(%{visibility: "direct"})
1640 |> order_by([activity], asc: activity.id)
1641 end
1642 end