[#3213] ActivityPub: fixed subquery-based hashtags filtering implementation (addresse...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
673 raise_on_missing_preload()
674 end
675
676 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
677 from(
678 [_activity, object] in query,
679 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
680 )
681 end
682
683 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
684 restrict_embedded_tag_any(query, %{tag: tag})
685 end
686
687 defp restrict_embedded_tag_all(query, _), do: query
688
689 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
690 raise_on_missing_preload()
691 end
692
693 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag}) do
694 from(
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
697 )
698 end
699
700 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
701 restrict_embedded_tag_any(query, %{tag: [tag]})
702 end
703
704 defp restrict_embedded_tag_any(query, _), do: query
705
706 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
707 raise_on_missing_preload()
708 end
709
710 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
711 from(
712 [_activity, object] in query,
713 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
714 )
715 end
716
717 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
718 when is_binary(tag_reject) do
719 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
720 end
721
722 defp restrict_embedded_tag_reject_any(query, _), do: query
723
724 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
725 raise_on_missing_preload()
726 end
727
728 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
729 from(
730 [_activity, object] in query,
731 where:
732 fragment(
733 """
734 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
735 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
736 AND hashtags_objects.object_id = ?) @> ?
737 """,
738 ^tags,
739 object.id,
740 ^tags
741 )
742 )
743 end
744
745 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
746 restrict_hashtag_any(query, %{tag: tag})
747 end
748
749 defp restrict_hashtag_all(query, _), do: query
750
751 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
752 raise_on_missing_preload()
753 end
754
755 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
756 from(
757 [_activity, object] in query,
758 where:
759 fragment(
760 """
761 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
762 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
763 AND hashtags_objects.object_id = ? LIMIT 1)
764 """,
765 ^tags,
766 object.id
767 )
768 )
769 end
770
771 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
772 restrict_hashtag_any(query, %{tag: [tag]})
773 end
774
775 defp restrict_hashtag_any(query, _), do: query
776
777 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
778 raise_on_missing_preload()
779 end
780
781 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
782 from(
783 [_activity, object] in query,
784 where:
785 fragment(
786 """
787 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
788 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
789 AND hashtags_objects.object_id = ? LIMIT 1)
790 """,
791 ^tags_reject,
792 object.id
793 )
794 )
795 end
796
797 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
798 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
799 end
800
801 defp restrict_hashtag_reject_any(query, _), do: query
802
803 defp raise_on_missing_preload do
804 raise "Can't use the child object without preloading!"
805 end
806
807 defp restrict_recipients(query, [], _user), do: query
808
809 defp restrict_recipients(query, recipients, nil) do
810 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
811 end
812
813 defp restrict_recipients(query, recipients, user) do
814 from(
815 activity in query,
816 where: fragment("? && ?", ^recipients, activity.recipients),
817 or_where: activity.actor == ^user.ap_id
818 )
819 end
820
821 defp restrict_local(query, %{local_only: true}) do
822 from(activity in query, where: activity.local == true)
823 end
824
825 defp restrict_local(query, _), do: query
826
827 defp restrict_actor(query, %{actor_id: actor_id}) do
828 from(activity in query, where: activity.actor == ^actor_id)
829 end
830
831 defp restrict_actor(query, _), do: query
832
833 defp restrict_type(query, %{type: type}) when is_binary(type) do
834 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
835 end
836
837 defp restrict_type(query, %{type: type}) do
838 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
839 end
840
841 defp restrict_type(query, _), do: query
842
843 defp restrict_state(query, %{state: state}) do
844 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
845 end
846
847 defp restrict_state(query, _), do: query
848
849 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
850 from(
851 [_activity, object] in query,
852 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
853 )
854 end
855
856 defp restrict_favorited_by(query, _), do: query
857
858 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
859 raise "Can't use the child object without preloading!"
860 end
861
862 defp restrict_media(query, %{only_media: true}) do
863 from(
864 [activity, object] in query,
865 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
866 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
867 )
868 end
869
870 defp restrict_media(query, _), do: query
871
872 defp restrict_replies(query, %{exclude_replies: true}) do
873 from(
874 [_activity, object] in query,
875 where: fragment("?->>'inReplyTo' is null", object.data)
876 )
877 end
878
879 defp restrict_replies(query, %{
880 reply_filtering_user: %User{} = user,
881 reply_visibility: "self"
882 }) do
883 from(
884 [activity, object] in query,
885 where:
886 fragment(
887 "?->>'inReplyTo' is null OR ? = ANY(?)",
888 object.data,
889 ^user.ap_id,
890 activity.recipients
891 )
892 )
893 end
894
895 defp restrict_replies(query, %{
896 reply_filtering_user: %User{} = user,
897 reply_visibility: "following"
898 }) do
899 from(
900 [activity, object] in query,
901 where:
902 fragment(
903 """
904 ?->>'type' != 'Create' -- This isn't a Create
905 OR ?->>'inReplyTo' is null -- this isn't a reply
906 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
907 -- unless they are the author (because authors
908 -- are also part of the recipients). This leads
909 -- to a bug that self-replies by friends won't
910 -- show up.
911 OR ? = ? -- The actor is us
912 """,
913 activity.data,
914 object.data,
915 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
916 activity.recipients,
917 activity.actor,
918 activity.actor,
919 ^user.ap_id
920 )
921 )
922 end
923
924 defp restrict_replies(query, _), do: query
925
926 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
927 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
928 end
929
930 defp restrict_reblogs(query, _), do: query
931
932 defp restrict_muted(query, %{with_muted: true}), do: query
933
934 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
935 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
936
937 query =
938 from([activity] in query,
939 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
940 where:
941 fragment(
942 "not (?->'to' \\?| ?) or ? = ?",
943 activity.data,
944 ^mutes,
945 activity.actor,
946 ^user.ap_id
947 )
948 )
949
950 unless opts[:skip_preload] do
951 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
952 else
953 query
954 end
955 end
956
957 defp restrict_muted(query, _), do: query
958
959 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
960 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
961 domain_blocks = user.domain_blocks || []
962
963 following_ap_ids = User.get_friends_ap_ids(user)
964
965 query =
966 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
967
968 from(
969 [activity, object: o] in query,
970 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
971 where:
972 fragment(
973 "((not (? && ?)) or ? = ?)",
974 activity.recipients,
975 ^blocked_ap_ids,
976 activity.actor,
977 ^user.ap_id
978 ),
979 where:
980 fragment(
981 "recipients_contain_blocked_domains(?, ?) = false",
982 activity.recipients,
983 ^domain_blocks
984 ),
985 where:
986 fragment(
987 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
988 activity.data,
989 activity.data,
990 ^blocked_ap_ids
991 ),
992 where:
993 fragment(
994 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
995 activity.actor,
996 ^domain_blocks,
997 activity.actor,
998 ^following_ap_ids
999 ),
1000 where:
1001 fragment(
1002 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1003 o.data,
1004 ^domain_blocks,
1005 o.data,
1006 ^following_ap_ids
1007 )
1008 )
1009 end
1010
1011 defp restrict_blocked(query, _), do: query
1012
1013 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1014 from(
1015 activity in query,
1016 where:
1017 fragment(
1018 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1019 activity.data,
1020 ^[Constants.as_public()]
1021 )
1022 )
1023 end
1024
1025 defp restrict_unlisted(query, _), do: query
1026
1027 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1028 from(activity in query, where: activity.id in ^ids)
1029 end
1030
1031 defp restrict_pinned(query, _), do: query
1032
1033 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1034 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1035
1036 from(
1037 activity in query,
1038 where:
1039 fragment(
1040 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1041 activity.data,
1042 activity.actor,
1043 ^muted_reblogs
1044 )
1045 )
1046 end
1047
1048 defp restrict_muted_reblogs(query, _), do: query
1049
1050 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1051 from(
1052 activity in query,
1053 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1054 )
1055 end
1056
1057 defp restrict_instance(query, _), do: query
1058
1059 defp restrict_filtered(query, %{user: %User{} = user}) do
1060 case Filter.compose_regex(user) do
1061 nil ->
1062 query
1063
1064 regex ->
1065 from([activity, object] in query,
1066 where:
1067 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1068 activity.actor == ^user.ap_id
1069 )
1070 end
1071 end
1072
1073 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1074 restrict_filtered(query, %{user: user})
1075 end
1076
1077 defp restrict_filtered(query, _), do: query
1078
1079 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1080
1081 defp exclude_poll_votes(query, _) do
1082 if has_named_binding?(query, :object) do
1083 from([activity, object: o] in query,
1084 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1085 )
1086 else
1087 query
1088 end
1089 end
1090
1091 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1092
1093 defp exclude_chat_messages(query, _) do
1094 if has_named_binding?(query, :object) do
1095 from([activity, object: o] in query,
1096 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1097 )
1098 else
1099 query
1100 end
1101 end
1102
1103 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1104
1105 defp exclude_invisible_actors(query, _opts) do
1106 invisible_ap_ids =
1107 User.Query.build(%{invisible: true, select: [:ap_id]})
1108 |> Repo.all()
1109 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1110
1111 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1112 end
1113
1114 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1115 from(activity in query, where: activity.id != ^id)
1116 end
1117
1118 defp exclude_id(query, _), do: query
1119
1120 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1121
1122 defp maybe_preload_objects(query, _) do
1123 query
1124 |> Activity.with_preloaded_object()
1125 end
1126
1127 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1128
1129 defp maybe_preload_bookmarks(query, opts) do
1130 query
1131 |> Activity.with_preloaded_bookmark(opts[:user])
1132 end
1133
1134 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1135 query
1136 |> Activity.with_preloaded_report_notes()
1137 end
1138
1139 defp maybe_preload_report_notes(query, _), do: query
1140
1141 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1142
1143 defp maybe_set_thread_muted_field(query, opts) do
1144 query
1145 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1146 end
1147
1148 defp maybe_order(query, %{order: :desc}) do
1149 query
1150 |> order_by(desc: :id)
1151 end
1152
1153 defp maybe_order(query, %{order: :asc}) do
1154 query
1155 |> order_by(asc: :id)
1156 end
1157
1158 defp maybe_order(query, _), do: query
1159
1160 defp fetch_activities_query_ap_ids_ops(opts) do
1161 source_user = opts[:muting_user]
1162 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1163
1164 ap_id_relationships =
1165 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1166 [:block | ap_id_relationships]
1167 else
1168 ap_id_relationships
1169 end
1170
1171 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1172
1173 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1174 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1175
1176 restrict_muted_reblogs_opts =
1177 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1178
1179 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1180 end
1181
1182 def fetch_activities_query(recipients, opts \\ %{}) do
1183 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1184 fetch_activities_query_ap_ids_ops(opts)
1185
1186 config = %{
1187 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1188 }
1189
1190 query =
1191 Activity
1192 |> maybe_preload_objects(opts)
1193 |> maybe_preload_bookmarks(opts)
1194 |> maybe_preload_report_notes(opts)
1195 |> maybe_set_thread_muted_field(opts)
1196 |> maybe_order(opts)
1197 |> restrict_recipients(recipients, opts[:user])
1198 |> restrict_replies(opts)
1199 |> restrict_since(opts)
1200 |> restrict_local(opts)
1201 |> restrict_actor(opts)
1202 |> restrict_type(opts)
1203 |> restrict_state(opts)
1204 |> restrict_favorited_by(opts)
1205 |> restrict_blocked(restrict_blocked_opts)
1206 |> restrict_muted(restrict_muted_opts)
1207 |> restrict_filtered(opts)
1208 |> restrict_media(opts)
1209 |> restrict_visibility(opts)
1210 |> restrict_thread_visibility(opts, config)
1211 |> restrict_reblogs(opts)
1212 |> restrict_pinned(opts)
1213 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1214 |> restrict_instance(opts)
1215 |> restrict_announce_object_actor(opts)
1216 |> restrict_filtered(opts)
1217 |> Activity.restrict_deactivated_users()
1218 |> exclude_poll_votes(opts)
1219 |> exclude_chat_messages(opts)
1220 |> exclude_invisible_actors(opts)
1221 |> exclude_visibility(opts)
1222
1223 if Config.improved_hashtag_timeline() do
1224 query
1225 |> restrict_hashtag_any(opts)
1226 |> restrict_hashtag_all(opts)
1227 |> restrict_hashtag_reject_any(opts)
1228 else
1229 query
1230 |> restrict_embedded_tag_any(opts)
1231 |> restrict_embedded_tag_all(opts)
1232 |> restrict_embedded_tag_reject_any(opts)
1233 end
1234 end
1235
1236 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1237 list_memberships = Pleroma.List.memberships(opts[:user])
1238
1239 fetch_activities_query(recipients ++ list_memberships, opts)
1240 |> Pagination.fetch_paginated(opts, pagination)
1241 |> Enum.reverse()
1242 |> maybe_update_cc(list_memberships, opts[:user])
1243 end
1244
1245 @doc """
1246 Fetch favorites activities of user with order by sort adds to favorites
1247 """
1248 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1249 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1250 user.ap_id
1251 |> Activity.Queries.by_actor()
1252 |> Activity.Queries.by_type("Like")
1253 |> Activity.with_joined_object()
1254 |> Object.with_joined_activity()
1255 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1256 |> order_by([like, _, _], desc_nulls_last: like.id)
1257 |> Pagination.fetch_paginated(
1258 Map.merge(params, %{skip_order: true}),
1259 pagination
1260 )
1261 end
1262
1263 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1264 Enum.map(activities, fn
1265 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1266 if Enum.any?(bcc, &(&1 in list_memberships)) do
1267 update_in(activity.data["cc"], &[user_ap_id | &1])
1268 else
1269 activity
1270 end
1271
1272 activity ->
1273 activity
1274 end)
1275 end
1276
1277 defp maybe_update_cc(activities, _, _), do: activities
1278
1279 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1280 from(activity in query,
1281 where:
1282 fragment("? && ?", activity.recipients, ^recipients) or
1283 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1284 ^Constants.as_public() in activity.recipients)
1285 )
1286 end
1287
1288 def fetch_activities_bounded(
1289 recipients,
1290 recipients_with_public,
1291 opts \\ %{},
1292 pagination \\ :keyset
1293 ) do
1294 fetch_activities_query([], opts)
1295 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1296 |> Pagination.fetch_paginated(opts, pagination)
1297 |> Enum.reverse()
1298 end
1299
1300 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1301 def upload(file, opts \\ []) do
1302 with {:ok, data} <- Upload.store(file, opts) do
1303 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1304
1305 Repo.insert(%Object{data: obj_data})
1306 end
1307 end
1308
1309 @spec get_actor_url(any()) :: binary() | nil
1310 defp get_actor_url(url) when is_binary(url), do: url
1311 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1312
1313 defp get_actor_url(url) when is_list(url) do
1314 url
1315 |> List.first()
1316 |> get_actor_url()
1317 end
1318
1319 defp get_actor_url(_url), do: nil
1320
1321 defp object_to_user_data(data) do
1322 avatar =
1323 data["icon"]["url"] &&
1324 %{
1325 "type" => "Image",
1326 "url" => [%{"href" => data["icon"]["url"]}]
1327 }
1328
1329 banner =
1330 data["image"]["url"] &&
1331 %{
1332 "type" => "Image",
1333 "url" => [%{"href" => data["image"]["url"]}]
1334 }
1335
1336 fields =
1337 data
1338 |> Map.get("attachment", [])
1339 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1340 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1341
1342 emojis =
1343 data
1344 |> Map.get("tag", [])
1345 |> Enum.filter(fn
1346 %{"type" => "Emoji"} -> true
1347 _ -> false
1348 end)
1349 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1350 {String.trim(name, ":"), url}
1351 end)
1352
1353 is_locked = data["manuallyApprovesFollowers"] || false
1354 capabilities = data["capabilities"] || %{}
1355 accepts_chat_messages = capabilities["acceptsChatMessages"]
1356 data = Transmogrifier.maybe_fix_user_object(data)
1357 is_discoverable = data["discoverable"] || false
1358 invisible = data["invisible"] || false
1359 actor_type = data["type"] || "Person"
1360
1361 public_key =
1362 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1363 data["publicKey"]["publicKeyPem"]
1364 else
1365 nil
1366 end
1367
1368 shared_inbox =
1369 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1370 data["endpoints"]["sharedInbox"]
1371 else
1372 nil
1373 end
1374
1375 user_data = %{
1376 ap_id: data["id"],
1377 uri: get_actor_url(data["url"]),
1378 ap_enabled: true,
1379 banner: banner,
1380 fields: fields,
1381 emoji: emojis,
1382 is_locked: is_locked,
1383 is_discoverable: is_discoverable,
1384 invisible: invisible,
1385 avatar: avatar,
1386 name: data["name"],
1387 follower_address: data["followers"],
1388 following_address: data["following"],
1389 bio: data["summary"] || "",
1390 actor_type: actor_type,
1391 also_known_as: Map.get(data, "alsoKnownAs", []),
1392 public_key: public_key,
1393 inbox: data["inbox"],
1394 shared_inbox: shared_inbox,
1395 accepts_chat_messages: accepts_chat_messages
1396 }
1397
1398 # nickname can be nil because of virtual actors
1399 if data["preferredUsername"] do
1400 Map.put(
1401 user_data,
1402 :nickname,
1403 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1404 )
1405 else
1406 Map.put(user_data, :nickname, nil)
1407 end
1408 end
1409
1410 def fetch_follow_information_for_user(user) do
1411 with {:ok, following_data} <-
1412 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1413 {:ok, hide_follows} <- collection_private(following_data),
1414 {:ok, followers_data} <-
1415 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1416 {:ok, hide_followers} <- collection_private(followers_data) do
1417 {:ok,
1418 %{
1419 hide_follows: hide_follows,
1420 follower_count: normalize_counter(followers_data["totalItems"]),
1421 following_count: normalize_counter(following_data["totalItems"]),
1422 hide_followers: hide_followers
1423 }}
1424 else
1425 {:error, _} = e -> e
1426 e -> {:error, e}
1427 end
1428 end
1429
1430 defp normalize_counter(counter) when is_integer(counter), do: counter
1431 defp normalize_counter(_), do: 0
1432
1433 def maybe_update_follow_information(user_data) do
1434 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1435 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1436 {_, true} <-
1437 {:collections_available,
1438 !!(user_data[:following_address] && user_data[:follower_address])},
1439 {:ok, info} <-
1440 fetch_follow_information_for_user(user_data) do
1441 info = Map.merge(user_data[:info] || %{}, info)
1442
1443 user_data
1444 |> Map.put(:info, info)
1445 else
1446 {:user_type_check, false} ->
1447 user_data
1448
1449 {:collections_available, false} ->
1450 user_data
1451
1452 {:enabled, false} ->
1453 user_data
1454
1455 e ->
1456 Logger.error(
1457 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1458 )
1459
1460 user_data
1461 end
1462 end
1463
1464 defp collection_private(%{"first" => %{"type" => type}})
1465 when type in ["CollectionPage", "OrderedCollectionPage"],
1466 do: {:ok, false}
1467
1468 defp collection_private(%{"first" => first}) do
1469 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1470 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1471 {:ok, false}
1472 else
1473 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1474 {:error, _} = e -> e
1475 e -> {:error, e}
1476 end
1477 end
1478
1479 defp collection_private(_data), do: {:ok, true}
1480
1481 def user_data_from_user_object(data) do
1482 with {:ok, data} <- MRF.filter(data) do
1483 {:ok, object_to_user_data(data)}
1484 else
1485 e -> {:error, e}
1486 end
1487 end
1488
1489 def fetch_and_prepare_user_from_ap_id(ap_id) do
1490 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1491 {:ok, data} <- user_data_from_user_object(data) do
1492 {:ok, maybe_update_follow_information(data)}
1493 else
1494 # If this has been deleted, only log a debug and not an error
1495 {:error, "Object has been deleted" = e} ->
1496 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1497 {:error, e}
1498
1499 {:error, {:reject, reason} = e} ->
1500 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1501 {:error, e}
1502
1503 {:error, e} ->
1504 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1505 {:error, e}
1506 end
1507 end
1508
1509 def maybe_handle_clashing_nickname(data) do
1510 with nickname when is_binary(nickname) <- data[:nickname],
1511 %User{} = old_user <- User.get_by_nickname(nickname),
1512 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1513 Logger.info(
1514 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1515 data[:ap_id]
1516 }, renaming."
1517 )
1518
1519 old_user
1520 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1521 |> User.update_and_set_cache()
1522 else
1523 {:ap_id_comparison, true} ->
1524 Logger.info(
1525 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1526 )
1527
1528 _ ->
1529 nil
1530 end
1531 end
1532
1533 def make_user_from_ap_id(ap_id) do
1534 user = User.get_cached_by_ap_id(ap_id)
1535
1536 if user && !User.ap_enabled?(user) do
1537 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1538 else
1539 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1540 if user do
1541 user
1542 |> User.remote_user_changeset(data)
1543 |> User.update_and_set_cache()
1544 else
1545 maybe_handle_clashing_nickname(data)
1546
1547 data
1548 |> User.remote_user_changeset()
1549 |> Repo.insert()
1550 |> User.set_cache()
1551 end
1552 end
1553 end
1554 end
1555
1556 def make_user_from_nickname(nickname) do
1557 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1558 make_user_from_ap_id(ap_id)
1559 else
1560 _e -> {:error, "No AP id in WebFinger"}
1561 end
1562 end
1563
1564 # filter out broken threads
1565 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1566 entire_thread_visible_for_user?(activity, user)
1567 end
1568
1569 # do post-processing on a specific activity
1570 def contain_activity(%Activity{} = activity, %User{} = user) do
1571 contain_broken_threads(activity, user)
1572 end
1573
1574 def fetch_direct_messages_query do
1575 Activity
1576 |> restrict_type(%{type: "Create"})
1577 |> restrict_visibility(%{visibility: "direct"})
1578 |> order_by([activity], asc: activity.id)
1579 end
1580 end