Merge remote-tracking branch 'remotes/origin/develop' into feature/object-hashtags...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
673 raise_on_missing_preload()
674 end
675
676 defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
677 from(
678 [_activity, object] in query,
679 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
680 )
681 end
682
683 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
684 restrict_embedded_tag_any(query, %{tag: tag})
685 end
686
687 defp restrict_embedded_tag_all(query, _), do: query
688
689 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
690 raise_on_missing_preload()
691 end
692
693 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_list(tag) do
694 from(
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
697 )
698 end
699
700 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
701 restrict_embedded_tag_any(query, %{tag: [tag]})
702 end
703
704 defp restrict_embedded_tag_any(query, _), do: query
705
706 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
707 raise_on_missing_preload()
708 end
709
710 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
711 when is_list(tag_reject) do
712 from(
713 [_activity, object] in query,
714 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
715 )
716 end
717
718 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
719 when is_binary(tag_reject) do
720 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
721 end
722
723 defp restrict_embedded_tag_reject_any(query, _), do: query
724
725 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
726 raise_on_missing_preload()
727 end
728
729 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
730 from(
731 [_activity, object] in query,
732 where:
733 fragment(
734 """
735 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
736 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
737 AND hashtags_objects.object_id = ?) @> ?
738 """,
739 ^tags,
740 object.id,
741 ^tags
742 )
743 )
744 end
745
746 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
747 restrict_hashtag_any(query, %{tag: tag})
748 end
749
750 defp restrict_hashtag_all(query, _), do: query
751
752 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
753 raise_on_missing_preload()
754 end
755
756 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
757 from(
758 [_activity, object] in query,
759 where:
760 fragment(
761 """
762 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
763 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
764 AND hashtags_objects.object_id = ? LIMIT 1)
765 """,
766 ^tags,
767 object.id
768 )
769 )
770 end
771
772 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
773 restrict_hashtag_any(query, %{tag: [tag]})
774 end
775
776 defp restrict_hashtag_any(query, _), do: query
777
778 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
779 raise_on_missing_preload()
780 end
781
782 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
783 from(
784 [_activity, object] in query,
785 where:
786 fragment(
787 """
788 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
789 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
790 AND hashtags_objects.object_id = ? LIMIT 1)
791 """,
792 ^tags_reject,
793 object.id
794 )
795 )
796 end
797
798 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
799 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
800 end
801
802 defp restrict_hashtag_reject_any(query, _), do: query
803
804 defp raise_on_missing_preload do
805 raise "Can't use the child object without preloading!"
806 end
807
808 defp restrict_recipients(query, [], _user), do: query
809
810 defp restrict_recipients(query, recipients, nil) do
811 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
812 end
813
814 defp restrict_recipients(query, recipients, user) do
815 from(
816 activity in query,
817 where: fragment("? && ?", ^recipients, activity.recipients),
818 or_where: activity.actor == ^user.ap_id
819 )
820 end
821
822 defp restrict_local(query, %{local_only: true}) do
823 from(activity in query, where: activity.local == true)
824 end
825
826 defp restrict_local(query, _), do: query
827
828 defp restrict_actor(query, %{actor_id: actor_id}) do
829 from(activity in query, where: activity.actor == ^actor_id)
830 end
831
832 defp restrict_actor(query, _), do: query
833
834 defp restrict_type(query, %{type: type}) when is_binary(type) do
835 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
836 end
837
838 defp restrict_type(query, %{type: type}) do
839 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
840 end
841
842 defp restrict_type(query, _), do: query
843
844 defp restrict_state(query, %{state: state}) do
845 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
846 end
847
848 defp restrict_state(query, _), do: query
849
850 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
851 from(
852 [_activity, object] in query,
853 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
854 )
855 end
856
857 defp restrict_favorited_by(query, _), do: query
858
859 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
860 raise "Can't use the child object without preloading!"
861 end
862
863 defp restrict_media(query, %{only_media: true}) do
864 from(
865 [activity, object] in query,
866 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
867 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
868 )
869 end
870
871 defp restrict_media(query, _), do: query
872
873 defp restrict_replies(query, %{exclude_replies: true}) do
874 from(
875 [_activity, object] in query,
876 where: fragment("?->>'inReplyTo' is null", object.data)
877 )
878 end
879
880 defp restrict_replies(query, %{
881 reply_filtering_user: %User{} = user,
882 reply_visibility: "self"
883 }) do
884 from(
885 [activity, object] in query,
886 where:
887 fragment(
888 "?->>'inReplyTo' is null OR ? = ANY(?)",
889 object.data,
890 ^user.ap_id,
891 activity.recipients
892 )
893 )
894 end
895
896 defp restrict_replies(query, %{
897 reply_filtering_user: %User{} = user,
898 reply_visibility: "following"
899 }) do
900 from(
901 [activity, object] in query,
902 where:
903 fragment(
904 """
905 ?->>'type' != 'Create' -- This isn't a Create
906 OR ?->>'inReplyTo' is null -- this isn't a reply
907 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
908 -- unless they are the author (because authors
909 -- are also part of the recipients). This leads
910 -- to a bug that self-replies by friends won't
911 -- show up.
912 OR ? = ? -- The actor is us
913 """,
914 activity.data,
915 object.data,
916 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
917 activity.recipients,
918 activity.actor,
919 activity.actor,
920 ^user.ap_id
921 )
922 )
923 end
924
925 defp restrict_replies(query, _), do: query
926
927 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
928 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
929 end
930
931 defp restrict_reblogs(query, _), do: query
932
933 defp restrict_muted(query, %{with_muted: true}), do: query
934
935 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
936 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
937
938 query =
939 from([activity] in query,
940 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
941 where:
942 fragment(
943 "not (?->'to' \\?| ?) or ? = ?",
944 activity.data,
945 ^mutes,
946 activity.actor,
947 ^user.ap_id
948 )
949 )
950
951 unless opts[:skip_preload] do
952 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
953 else
954 query
955 end
956 end
957
958 defp restrict_muted(query, _), do: query
959
960 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
961 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
962 domain_blocks = user.domain_blocks || []
963
964 following_ap_ids = User.get_friends_ap_ids(user)
965
966 query =
967 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
968
969 from(
970 [activity, object: o] in query,
971 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
972 where:
973 fragment(
974 "((not (? && ?)) or ? = ?)",
975 activity.recipients,
976 ^blocked_ap_ids,
977 activity.actor,
978 ^user.ap_id
979 ),
980 where:
981 fragment(
982 "recipients_contain_blocked_domains(?, ?) = false",
983 activity.recipients,
984 ^domain_blocks
985 ),
986 where:
987 fragment(
988 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
989 activity.data,
990 activity.data,
991 ^blocked_ap_ids
992 ),
993 where:
994 fragment(
995 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
996 activity.actor,
997 ^domain_blocks,
998 activity.actor,
999 ^following_ap_ids
1000 ),
1001 where:
1002 fragment(
1003 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1004 o.data,
1005 ^domain_blocks,
1006 o.data,
1007 ^following_ap_ids
1008 )
1009 )
1010 end
1011
1012 defp restrict_blocked(query, _), do: query
1013
1014 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1015 from(
1016 activity in query,
1017 where:
1018 fragment(
1019 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1020 activity.data,
1021 ^[Constants.as_public()]
1022 )
1023 )
1024 end
1025
1026 defp restrict_unlisted(query, _), do: query
1027
1028 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1029 from(activity in query, where: activity.id in ^ids)
1030 end
1031
1032 defp restrict_pinned(query, _), do: query
1033
1034 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1035 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1036
1037 from(
1038 activity in query,
1039 where:
1040 fragment(
1041 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1042 activity.data,
1043 activity.actor,
1044 ^muted_reblogs
1045 )
1046 )
1047 end
1048
1049 defp restrict_muted_reblogs(query, _), do: query
1050
1051 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1052 from(
1053 activity in query,
1054 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1055 )
1056 end
1057
1058 defp restrict_instance(query, _), do: query
1059
1060 defp restrict_filtered(query, %{user: %User{} = user}) do
1061 case Filter.compose_regex(user) do
1062 nil ->
1063 query
1064
1065 regex ->
1066 from([activity, object] in query,
1067 where:
1068 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1069 activity.actor == ^user.ap_id
1070 )
1071 end
1072 end
1073
1074 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1075 restrict_filtered(query, %{user: user})
1076 end
1077
1078 defp restrict_filtered(query, _), do: query
1079
1080 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1081
1082 defp exclude_poll_votes(query, _) do
1083 if has_named_binding?(query, :object) do
1084 from([activity, object: o] in query,
1085 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1086 )
1087 else
1088 query
1089 end
1090 end
1091
1092 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1093
1094 defp exclude_chat_messages(query, _) do
1095 if has_named_binding?(query, :object) do
1096 from([activity, object: o] in query,
1097 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1098 )
1099 else
1100 query
1101 end
1102 end
1103
1104 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1105
1106 defp exclude_invisible_actors(query, _opts) do
1107 invisible_ap_ids =
1108 User.Query.build(%{invisible: true, select: [:ap_id]})
1109 |> Repo.all()
1110 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1111
1112 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1113 end
1114
1115 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1116 from(activity in query, where: activity.id != ^id)
1117 end
1118
1119 defp exclude_id(query, _), do: query
1120
1121 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1122
1123 defp maybe_preload_objects(query, _) do
1124 query
1125 |> Activity.with_preloaded_object()
1126 end
1127
1128 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1129
1130 defp maybe_preload_bookmarks(query, opts) do
1131 query
1132 |> Activity.with_preloaded_bookmark(opts[:user])
1133 end
1134
1135 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1136 query
1137 |> Activity.with_preloaded_report_notes()
1138 end
1139
1140 defp maybe_preload_report_notes(query, _), do: query
1141
1142 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1143
1144 defp maybe_set_thread_muted_field(query, opts) do
1145 query
1146 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1147 end
1148
1149 defp maybe_order(query, %{order: :desc}) do
1150 query
1151 |> order_by(desc: :id)
1152 end
1153
1154 defp maybe_order(query, %{order: :asc}) do
1155 query
1156 |> order_by(asc: :id)
1157 end
1158
1159 defp maybe_order(query, _), do: query
1160
1161 defp fetch_activities_query_ap_ids_ops(opts) do
1162 source_user = opts[:muting_user]
1163 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1164
1165 ap_id_relationships =
1166 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1167 [:block | ap_id_relationships]
1168 else
1169 ap_id_relationships
1170 end
1171
1172 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1173
1174 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1175 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1176
1177 restrict_muted_reblogs_opts =
1178 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1179
1180 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1181 end
1182
1183 def fetch_activities_query(recipients, opts \\ %{}) do
1184 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1185 fetch_activities_query_ap_ids_ops(opts)
1186
1187 config = %{
1188 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1189 }
1190
1191 query =
1192 Activity
1193 |> maybe_preload_objects(opts)
1194 |> maybe_preload_bookmarks(opts)
1195 |> maybe_preload_report_notes(opts)
1196 |> maybe_set_thread_muted_field(opts)
1197 |> maybe_order(opts)
1198 |> restrict_recipients(recipients, opts[:user])
1199 |> restrict_replies(opts)
1200 |> restrict_since(opts)
1201 |> restrict_local(opts)
1202 |> restrict_actor(opts)
1203 |> restrict_type(opts)
1204 |> restrict_state(opts)
1205 |> restrict_favorited_by(opts)
1206 |> restrict_blocked(restrict_blocked_opts)
1207 |> restrict_muted(restrict_muted_opts)
1208 |> restrict_filtered(opts)
1209 |> restrict_media(opts)
1210 |> restrict_visibility(opts)
1211 |> restrict_thread_visibility(opts, config)
1212 |> restrict_reblogs(opts)
1213 |> restrict_pinned(opts)
1214 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1215 |> restrict_instance(opts)
1216 |> restrict_announce_object_actor(opts)
1217 |> restrict_filtered(opts)
1218 |> Activity.restrict_deactivated_users()
1219 |> exclude_poll_votes(opts)
1220 |> exclude_chat_messages(opts)
1221 |> exclude_invisible_actors(opts)
1222 |> exclude_visibility(opts)
1223
1224 if Config.improved_hashtag_timeline() do
1225 query
1226 |> restrict_hashtag_any(opts)
1227 |> restrict_hashtag_all(opts)
1228 |> restrict_hashtag_reject_any(opts)
1229 else
1230 query
1231 |> restrict_embedded_tag_any(opts)
1232 |> restrict_embedded_tag_all(opts)
1233 |> restrict_embedded_tag_reject_any(opts)
1234 end
1235 end
1236
1237 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1238 list_memberships = Pleroma.List.memberships(opts[:user])
1239
1240 fetch_activities_query(recipients ++ list_memberships, opts)
1241 |> Pagination.fetch_paginated(opts, pagination)
1242 |> Enum.reverse()
1243 |> maybe_update_cc(list_memberships, opts[:user])
1244 end
1245
1246 @doc """
1247 Fetch favorites activities of user with order by sort adds to favorites
1248 """
1249 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1250 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1251 user.ap_id
1252 |> Activity.Queries.by_actor()
1253 |> Activity.Queries.by_type("Like")
1254 |> Activity.with_joined_object()
1255 |> Object.with_joined_activity()
1256 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1257 |> order_by([like, _, _], desc_nulls_last: like.id)
1258 |> Pagination.fetch_paginated(
1259 Map.merge(params, %{skip_order: true}),
1260 pagination
1261 )
1262 end
1263
1264 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1265 Enum.map(activities, fn
1266 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1267 if Enum.any?(bcc, &(&1 in list_memberships)) do
1268 update_in(activity.data["cc"], &[user_ap_id | &1])
1269 else
1270 activity
1271 end
1272
1273 activity ->
1274 activity
1275 end)
1276 end
1277
1278 defp maybe_update_cc(activities, _, _), do: activities
1279
1280 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1281 from(activity in query,
1282 where:
1283 fragment("? && ?", activity.recipients, ^recipients) or
1284 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1285 ^Constants.as_public() in activity.recipients)
1286 )
1287 end
1288
1289 def fetch_activities_bounded(
1290 recipients,
1291 recipients_with_public,
1292 opts \\ %{},
1293 pagination \\ :keyset
1294 ) do
1295 fetch_activities_query([], opts)
1296 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1297 |> Pagination.fetch_paginated(opts, pagination)
1298 |> Enum.reverse()
1299 end
1300
1301 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1302 def upload(file, opts \\ []) do
1303 with {:ok, data} <- Upload.store(file, opts) do
1304 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1305
1306 Repo.insert(%Object{data: obj_data})
1307 end
1308 end
1309
1310 @spec get_actor_url(any()) :: binary() | nil
1311 defp get_actor_url(url) when is_binary(url), do: url
1312 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1313
1314 defp get_actor_url(url) when is_list(url) do
1315 url
1316 |> List.first()
1317 |> get_actor_url()
1318 end
1319
1320 defp get_actor_url(_url), do: nil
1321
1322 defp object_to_user_data(data) do
1323 avatar =
1324 data["icon"]["url"] &&
1325 %{
1326 "type" => "Image",
1327 "url" => [%{"href" => data["icon"]["url"]}]
1328 }
1329
1330 banner =
1331 data["image"]["url"] &&
1332 %{
1333 "type" => "Image",
1334 "url" => [%{"href" => data["image"]["url"]}]
1335 }
1336
1337 fields =
1338 data
1339 |> Map.get("attachment", [])
1340 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1341 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1342
1343 emojis =
1344 data
1345 |> Map.get("tag", [])
1346 |> Enum.filter(fn
1347 %{"type" => "Emoji"} -> true
1348 _ -> false
1349 end)
1350 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1351 {String.trim(name, ":"), url}
1352 end)
1353
1354 is_locked = data["manuallyApprovesFollowers"] || false
1355 capabilities = data["capabilities"] || %{}
1356 accepts_chat_messages = capabilities["acceptsChatMessages"]
1357 data = Transmogrifier.maybe_fix_user_object(data)
1358 is_discoverable = data["discoverable"] || false
1359 invisible = data["invisible"] || false
1360 actor_type = data["type"] || "Person"
1361
1362 public_key =
1363 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1364 data["publicKey"]["publicKeyPem"]
1365 else
1366 nil
1367 end
1368
1369 shared_inbox =
1370 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1371 data["endpoints"]["sharedInbox"]
1372 else
1373 nil
1374 end
1375
1376 user_data = %{
1377 ap_id: data["id"],
1378 uri: get_actor_url(data["url"]),
1379 ap_enabled: true,
1380 banner: banner,
1381 fields: fields,
1382 emoji: emojis,
1383 is_locked: is_locked,
1384 is_discoverable: is_discoverable,
1385 invisible: invisible,
1386 avatar: avatar,
1387 name: data["name"],
1388 follower_address: data["followers"],
1389 following_address: data["following"],
1390 bio: data["summary"] || "",
1391 actor_type: actor_type,
1392 also_known_as: Map.get(data, "alsoKnownAs", []),
1393 public_key: public_key,
1394 inbox: data["inbox"],
1395 shared_inbox: shared_inbox,
1396 accepts_chat_messages: accepts_chat_messages
1397 }
1398
1399 # nickname can be nil because of virtual actors
1400 if data["preferredUsername"] do
1401 Map.put(
1402 user_data,
1403 :nickname,
1404 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1405 )
1406 else
1407 Map.put(user_data, :nickname, nil)
1408 end
1409 end
1410
1411 def fetch_follow_information_for_user(user) do
1412 with {:ok, following_data} <-
1413 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1414 {:ok, hide_follows} <- collection_private(following_data),
1415 {:ok, followers_data} <-
1416 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1417 {:ok, hide_followers} <- collection_private(followers_data) do
1418 {:ok,
1419 %{
1420 hide_follows: hide_follows,
1421 follower_count: normalize_counter(followers_data["totalItems"]),
1422 following_count: normalize_counter(following_data["totalItems"]),
1423 hide_followers: hide_followers
1424 }}
1425 else
1426 {:error, _} = e -> e
1427 e -> {:error, e}
1428 end
1429 end
1430
1431 defp normalize_counter(counter) when is_integer(counter), do: counter
1432 defp normalize_counter(_), do: 0
1433
1434 def maybe_update_follow_information(user_data) do
1435 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1436 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1437 {_, true} <-
1438 {:collections_available,
1439 !!(user_data[:following_address] && user_data[:follower_address])},
1440 {:ok, info} <-
1441 fetch_follow_information_for_user(user_data) do
1442 info = Map.merge(user_data[:info] || %{}, info)
1443
1444 user_data
1445 |> Map.put(:info, info)
1446 else
1447 {:user_type_check, false} ->
1448 user_data
1449
1450 {:collections_available, false} ->
1451 user_data
1452
1453 {:enabled, false} ->
1454 user_data
1455
1456 e ->
1457 Logger.error(
1458 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1459 )
1460
1461 user_data
1462 end
1463 end
1464
1465 defp collection_private(%{"first" => %{"type" => type}})
1466 when type in ["CollectionPage", "OrderedCollectionPage"],
1467 do: {:ok, false}
1468
1469 defp collection_private(%{"first" => first}) do
1470 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1471 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1472 {:ok, false}
1473 else
1474 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1475 {:error, _} = e -> e
1476 e -> {:error, e}
1477 end
1478 end
1479
1480 defp collection_private(_data), do: {:ok, true}
1481
1482 def user_data_from_user_object(data) do
1483 with {:ok, data} <- MRF.filter(data) do
1484 {:ok, object_to_user_data(data)}
1485 else
1486 e -> {:error, e}
1487 end
1488 end
1489
1490 def fetch_and_prepare_user_from_ap_id(ap_id) do
1491 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1492 {:ok, data} <- user_data_from_user_object(data) do
1493 {:ok, maybe_update_follow_information(data)}
1494 else
1495 # If this has been deleted, only log a debug and not an error
1496 {:error, "Object has been deleted" = e} ->
1497 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1498 {:error, e}
1499
1500 {:error, {:reject, reason} = e} ->
1501 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1502 {:error, e}
1503
1504 {:error, e} ->
1505 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1506 {:error, e}
1507 end
1508 end
1509
1510 def maybe_handle_clashing_nickname(data) do
1511 with nickname when is_binary(nickname) <- data[:nickname],
1512 %User{} = old_user <- User.get_by_nickname(nickname),
1513 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1514 Logger.info(
1515 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1516 data[:ap_id]
1517 }, renaming."
1518 )
1519
1520 old_user
1521 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1522 |> User.update_and_set_cache()
1523 else
1524 {:ap_id_comparison, true} ->
1525 Logger.info(
1526 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1527 )
1528
1529 _ ->
1530 nil
1531 end
1532 end
1533
1534 def make_user_from_ap_id(ap_id) do
1535 user = User.get_cached_by_ap_id(ap_id)
1536
1537 if user && !User.ap_enabled?(user) do
1538 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1539 else
1540 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1541 if user do
1542 user
1543 |> User.remote_user_changeset(data)
1544 |> User.update_and_set_cache()
1545 else
1546 maybe_handle_clashing_nickname(data)
1547
1548 data
1549 |> User.remote_user_changeset()
1550 |> Repo.insert()
1551 |> User.set_cache()
1552 end
1553 end
1554 end
1555 end
1556
1557 def make_user_from_nickname(nickname) do
1558 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1559 make_user_from_ap_id(ap_id)
1560 else
1561 _e -> {:error, "No AP id in WebFinger"}
1562 end
1563 end
1564
1565 # filter out broken threads
1566 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1567 entire_thread_visible_for_user?(activity, user)
1568 end
1569
1570 # do post-processing on a specific activity
1571 def contain_activity(%Activity{} = activity, %User{} = user) do
1572 contain_broken_threads(activity, user)
1573 end
1574
1575 def fetch_direct_messages_query do
1576 Activity
1577 |> restrict_type(%{type: "Create"})
1578 |> restrict_visibility(%{visibility: "direct"})
1579 |> order_by([activity], asc: activity.id)
1580 end
1581 end