Merge remote-tracking branch 'remotes/origin/develop' into feature/object-hashtags...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
673 raise_on_missing_preload()
674 end
675
676 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
677 from(
678 [_activity, object] in query,
679 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
680 )
681 end
682
683 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
684 restrict_embedded_tag_any(query, %{tag: tag})
685 end
686
687 defp restrict_embedded_tag_all(query, _), do: query
688
689 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
690 raise_on_missing_preload()
691 end
692
693 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag}) do
694 from(
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
697 )
698 end
699
700 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
701 restrict_embedded_tag_any(query, %{tag: [tag]})
702 end
703
704 defp restrict_embedded_tag_any(query, _), do: query
705
706 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
707 raise_on_missing_preload()
708 end
709
710 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
711 from(
712 [_activity, object] in query,
713 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
714 )
715 end
716
717 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
718 when is_binary(tag_reject) do
719 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
720 end
721
722 defp restrict_embedded_tag_reject_any(query, _), do: query
723
724 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
725 raise_on_missing_preload()
726 end
727
728 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
729 from(
730 [_activity, object] in query,
731 where:
732 fragment(
733 """
734 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
735 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
736 AND hashtags_objects.object_id = ?) @> ?
737 """,
738 ^tags,
739 object.id,
740 ^tags
741 )
742 )
743 end
744
745 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
746 restrict_hashtag_any(query, %{tag: tag})
747 end
748
749 defp restrict_hashtag_all(query, _), do: query
750
751 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
752 raise_on_missing_preload()
753 end
754
755 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
756 from(
757 [_activity, object] in query,
758 where:
759 fragment(
760 """
761 EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
762 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
763 AND hashtags_objects.object_id = ? LIMIT 1)
764 """,
765 ^tags,
766 object.id
767 )
768 )
769 end
770
771 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
772 restrict_hashtag_any(query, %{tag: [tag]})
773 end
774
775 defp restrict_hashtag_any(query, _), do: query
776
777 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
778 raise_on_missing_preload()
779 end
780
781 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
782 from(
783 [_activity, object] in query,
784 where:
785 fragment(
786 """
787 NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
788 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
789 AND hashtags_objects.object_id = ? LIMIT 1)
790 """,
791 ^tags_reject,
792 object.id
793 )
794 )
795 end
796
797 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
798 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
799 end
800
801 defp restrict_hashtag_reject_any(query, _), do: query
802
803 defp raise_on_missing_preload do
804 raise "Can't use the child object without preloading!"
805 end
806
807 defp restrict_recipients(query, [], _user), do: query
808
809 defp restrict_recipients(query, recipients, nil) do
810 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
811 end
812
813 defp restrict_recipients(query, recipients, user) do
814 from(
815 activity in query,
816 where: fragment("? && ?", ^recipients, activity.recipients),
817 or_where: activity.actor == ^user.ap_id
818 )
819 end
820
821 defp restrict_local(query, %{local_only: true}) do
822 from(activity in query, where: activity.local == true)
823 end
824
825 defp restrict_local(query, _), do: query
826
827 defp restrict_remote(query, %{remote: true}) do
828 from(activity in query, where: activity.local == false)
829 end
830
831 defp restrict_remote(query, _), do: query
832
833 defp restrict_actor(query, %{actor_id: actor_id}) do
834 from(activity in query, where: activity.actor == ^actor_id)
835 end
836
837 defp restrict_actor(query, _), do: query
838
839 defp restrict_type(query, %{type: type}) when is_binary(type) do
840 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
841 end
842
843 defp restrict_type(query, %{type: type}) do
844 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
845 end
846
847 defp restrict_type(query, _), do: query
848
849 defp restrict_state(query, %{state: state}) do
850 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
851 end
852
853 defp restrict_state(query, _), do: query
854
855 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
856 from(
857 [_activity, object] in query,
858 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
859 )
860 end
861
862 defp restrict_favorited_by(query, _), do: query
863
864 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
865 raise "Can't use the child object without preloading!"
866 end
867
868 defp restrict_media(query, %{only_media: true}) do
869 from(
870 [activity, object] in query,
871 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
872 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
873 )
874 end
875
876 defp restrict_media(query, _), do: query
877
878 defp restrict_replies(query, %{exclude_replies: true}) do
879 from(
880 [_activity, object] in query,
881 where: fragment("?->>'inReplyTo' is null", object.data)
882 )
883 end
884
885 defp restrict_replies(query, %{
886 reply_filtering_user: %User{} = user,
887 reply_visibility: "self"
888 }) do
889 from(
890 [activity, object] in query,
891 where:
892 fragment(
893 "?->>'inReplyTo' is null OR ? = ANY(?)",
894 object.data,
895 ^user.ap_id,
896 activity.recipients
897 )
898 )
899 end
900
901 defp restrict_replies(query, %{
902 reply_filtering_user: %User{} = user,
903 reply_visibility: "following"
904 }) do
905 from(
906 [activity, object] in query,
907 where:
908 fragment(
909 """
910 ?->>'type' != 'Create' -- This isn't a Create
911 OR ?->>'inReplyTo' is null -- this isn't a reply
912 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
913 -- unless they are the author (because authors
914 -- are also part of the recipients). This leads
915 -- to a bug that self-replies by friends won't
916 -- show up.
917 OR ? = ? -- The actor is us
918 """,
919 activity.data,
920 object.data,
921 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
922 activity.recipients,
923 activity.actor,
924 activity.actor,
925 ^user.ap_id
926 )
927 )
928 end
929
930 defp restrict_replies(query, _), do: query
931
932 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
933 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
934 end
935
936 defp restrict_reblogs(query, _), do: query
937
938 defp restrict_muted(query, %{with_muted: true}), do: query
939
940 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
941 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
942
943 query =
944 from([activity] in query,
945 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
946 where:
947 fragment(
948 "not (?->'to' \\?| ?) or ? = ?",
949 activity.data,
950 ^mutes,
951 activity.actor,
952 ^user.ap_id
953 )
954 )
955
956 unless opts[:skip_preload] do
957 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
958 else
959 query
960 end
961 end
962
963 defp restrict_muted(query, _), do: query
964
965 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
966 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
967 domain_blocks = user.domain_blocks || []
968
969 following_ap_ids = User.get_friends_ap_ids(user)
970
971 query =
972 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
973
974 from(
975 [activity, object: o] in query,
976 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
977 where:
978 fragment(
979 "((not (? && ?)) or ? = ?)",
980 activity.recipients,
981 ^blocked_ap_ids,
982 activity.actor,
983 ^user.ap_id
984 ),
985 where:
986 fragment(
987 "recipients_contain_blocked_domains(?, ?) = false",
988 activity.recipients,
989 ^domain_blocks
990 ),
991 where:
992 fragment(
993 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
994 activity.data,
995 activity.data,
996 ^blocked_ap_ids
997 ),
998 where:
999 fragment(
1000 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1001 activity.actor,
1002 ^domain_blocks,
1003 activity.actor,
1004 ^following_ap_ids
1005 ),
1006 where:
1007 fragment(
1008 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1009 o.data,
1010 ^domain_blocks,
1011 o.data,
1012 ^following_ap_ids
1013 )
1014 )
1015 end
1016
1017 defp restrict_blocked(query, _), do: query
1018
1019 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1020 from(
1021 activity in query,
1022 where:
1023 fragment(
1024 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1025 activity.data,
1026 ^[Constants.as_public()]
1027 )
1028 )
1029 end
1030
1031 defp restrict_unlisted(query, _), do: query
1032
1033 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1034 from(activity in query, where: activity.id in ^ids)
1035 end
1036
1037 defp restrict_pinned(query, _), do: query
1038
1039 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1040 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1041
1042 from(
1043 activity in query,
1044 where:
1045 fragment(
1046 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1047 activity.data,
1048 activity.actor,
1049 ^muted_reblogs
1050 )
1051 )
1052 end
1053
1054 defp restrict_muted_reblogs(query, _), do: query
1055
1056 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1057 from(
1058 activity in query,
1059 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1060 )
1061 end
1062
1063 defp restrict_instance(query, _), do: query
1064
1065 defp restrict_filtered(query, %{user: %User{} = user}) do
1066 case Filter.compose_regex(user) do
1067 nil ->
1068 query
1069
1070 regex ->
1071 from([activity, object] in query,
1072 where:
1073 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1074 activity.actor == ^user.ap_id
1075 )
1076 end
1077 end
1078
1079 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1080 restrict_filtered(query, %{user: user})
1081 end
1082
1083 defp restrict_filtered(query, _), do: query
1084
1085 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1086
1087 defp exclude_poll_votes(query, _) do
1088 if has_named_binding?(query, :object) do
1089 from([activity, object: o] in query,
1090 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1091 )
1092 else
1093 query
1094 end
1095 end
1096
1097 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1098
1099 defp exclude_chat_messages(query, _) do
1100 if has_named_binding?(query, :object) do
1101 from([activity, object: o] in query,
1102 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1103 )
1104 else
1105 query
1106 end
1107 end
1108
1109 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1110
1111 defp exclude_invisible_actors(query, _opts) do
1112 invisible_ap_ids =
1113 User.Query.build(%{invisible: true, select: [:ap_id]})
1114 |> Repo.all()
1115 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1116
1117 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1118 end
1119
1120 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1121 from(activity in query, where: activity.id != ^id)
1122 end
1123
1124 defp exclude_id(query, _), do: query
1125
1126 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1127
1128 defp maybe_preload_objects(query, _) do
1129 query
1130 |> Activity.with_preloaded_object()
1131 end
1132
1133 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1134
1135 defp maybe_preload_bookmarks(query, opts) do
1136 query
1137 |> Activity.with_preloaded_bookmark(opts[:user])
1138 end
1139
1140 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1141 query
1142 |> Activity.with_preloaded_report_notes()
1143 end
1144
1145 defp maybe_preload_report_notes(query, _), do: query
1146
1147 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1148
1149 defp maybe_set_thread_muted_field(query, opts) do
1150 query
1151 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1152 end
1153
1154 defp maybe_order(query, %{order: :desc}) do
1155 query
1156 |> order_by(desc: :id)
1157 end
1158
1159 defp maybe_order(query, %{order: :asc}) do
1160 query
1161 |> order_by(asc: :id)
1162 end
1163
1164 defp maybe_order(query, _), do: query
1165
1166 defp fetch_activities_query_ap_ids_ops(opts) do
1167 source_user = opts[:muting_user]
1168 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1169
1170 ap_id_relationships =
1171 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1172 [:block | ap_id_relationships]
1173 else
1174 ap_id_relationships
1175 end
1176
1177 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1178
1179 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1180 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1181
1182 restrict_muted_reblogs_opts =
1183 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1184
1185 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1186 end
1187
1188 def fetch_activities_query(recipients, opts \\ %{}) do
1189 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1190 fetch_activities_query_ap_ids_ops(opts)
1191
1192 config = %{
1193 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1194 }
1195
1196 query =
1197 Activity
1198 |> maybe_preload_objects(opts)
1199 |> maybe_preload_bookmarks(opts)
1200 |> maybe_preload_report_notes(opts)
1201 |> maybe_set_thread_muted_field(opts)
1202 |> maybe_order(opts)
1203 |> restrict_recipients(recipients, opts[:user])
1204 |> restrict_replies(opts)
1205 |> restrict_since(opts)
1206 |> restrict_local(opts)
1207 |> restrict_remote(opts)
1208 |> restrict_actor(opts)
1209 |> restrict_type(opts)
1210 |> restrict_state(opts)
1211 |> restrict_favorited_by(opts)
1212 |> restrict_blocked(restrict_blocked_opts)
1213 |> restrict_muted(restrict_muted_opts)
1214 |> restrict_filtered(opts)
1215 |> restrict_media(opts)
1216 |> restrict_visibility(opts)
1217 |> restrict_thread_visibility(opts, config)
1218 |> restrict_reblogs(opts)
1219 |> restrict_pinned(opts)
1220 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1221 |> restrict_instance(opts)
1222 |> restrict_announce_object_actor(opts)
1223 |> restrict_filtered(opts)
1224 |> Activity.restrict_deactivated_users()
1225 |> exclude_poll_votes(opts)
1226 |> exclude_chat_messages(opts)
1227 |> exclude_invisible_actors(opts)
1228 |> exclude_visibility(opts)
1229
1230 if Config.improved_hashtag_timeline() do
1231 query
1232 |> restrict_hashtag_any(opts)
1233 |> restrict_hashtag_all(opts)
1234 |> restrict_hashtag_reject_any(opts)
1235 else
1236 query
1237 |> restrict_embedded_tag_any(opts)
1238 |> restrict_embedded_tag_all(opts)
1239 |> restrict_embedded_tag_reject_any(opts)
1240 end
1241 end
1242
1243 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1244 list_memberships = Pleroma.List.memberships(opts[:user])
1245
1246 fetch_activities_query(recipients ++ list_memberships, opts)
1247 |> Pagination.fetch_paginated(opts, pagination)
1248 |> Enum.reverse()
1249 |> maybe_update_cc(list_memberships, opts[:user])
1250 end
1251
1252 @doc """
1253 Fetch favorites activities of user with order by sort adds to favorites
1254 """
1255 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1256 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1257 user.ap_id
1258 |> Activity.Queries.by_actor()
1259 |> Activity.Queries.by_type("Like")
1260 |> Activity.with_joined_object()
1261 |> Object.with_joined_activity()
1262 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1263 |> order_by([like, _, _], desc_nulls_last: like.id)
1264 |> Pagination.fetch_paginated(
1265 Map.merge(params, %{skip_order: true}),
1266 pagination
1267 )
1268 end
1269
1270 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1271 Enum.map(activities, fn
1272 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1273 if Enum.any?(bcc, &(&1 in list_memberships)) do
1274 update_in(activity.data["cc"], &[user_ap_id | &1])
1275 else
1276 activity
1277 end
1278
1279 activity ->
1280 activity
1281 end)
1282 end
1283
1284 defp maybe_update_cc(activities, _, _), do: activities
1285
1286 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1287 from(activity in query,
1288 where:
1289 fragment("? && ?", activity.recipients, ^recipients) or
1290 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1291 ^Constants.as_public() in activity.recipients)
1292 )
1293 end
1294
1295 def fetch_activities_bounded(
1296 recipients,
1297 recipients_with_public,
1298 opts \\ %{},
1299 pagination \\ :keyset
1300 ) do
1301 fetch_activities_query([], opts)
1302 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1303 |> Pagination.fetch_paginated(opts, pagination)
1304 |> Enum.reverse()
1305 end
1306
1307 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1308 def upload(file, opts \\ []) do
1309 with {:ok, data} <- Upload.store(file, opts) do
1310 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1311
1312 Repo.insert(%Object{data: obj_data})
1313 end
1314 end
1315
1316 @spec get_actor_url(any()) :: binary() | nil
1317 defp get_actor_url(url) when is_binary(url), do: url
1318 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1319
1320 defp get_actor_url(url) when is_list(url) do
1321 url
1322 |> List.first()
1323 |> get_actor_url()
1324 end
1325
1326 defp get_actor_url(_url), do: nil
1327
1328 defp object_to_user_data(data) do
1329 avatar =
1330 data["icon"]["url"] &&
1331 %{
1332 "type" => "Image",
1333 "url" => [%{"href" => data["icon"]["url"]}]
1334 }
1335
1336 banner =
1337 data["image"]["url"] &&
1338 %{
1339 "type" => "Image",
1340 "url" => [%{"href" => data["image"]["url"]}]
1341 }
1342
1343 fields =
1344 data
1345 |> Map.get("attachment", [])
1346 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1347 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1348
1349 emojis =
1350 data
1351 |> Map.get("tag", [])
1352 |> Enum.filter(fn
1353 %{"type" => "Emoji"} -> true
1354 _ -> false
1355 end)
1356 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1357 {String.trim(name, ":"), url}
1358 end)
1359
1360 is_locked = data["manuallyApprovesFollowers"] || false
1361 capabilities = data["capabilities"] || %{}
1362 accepts_chat_messages = capabilities["acceptsChatMessages"]
1363 data = Transmogrifier.maybe_fix_user_object(data)
1364 is_discoverable = data["discoverable"] || false
1365 invisible = data["invisible"] || false
1366 actor_type = data["type"] || "Person"
1367
1368 public_key =
1369 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1370 data["publicKey"]["publicKeyPem"]
1371 else
1372 nil
1373 end
1374
1375 shared_inbox =
1376 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1377 data["endpoints"]["sharedInbox"]
1378 else
1379 nil
1380 end
1381
1382 user_data = %{
1383 ap_id: data["id"],
1384 uri: get_actor_url(data["url"]),
1385 ap_enabled: true,
1386 banner: banner,
1387 fields: fields,
1388 emoji: emojis,
1389 is_locked: is_locked,
1390 is_discoverable: is_discoverable,
1391 invisible: invisible,
1392 avatar: avatar,
1393 name: data["name"],
1394 follower_address: data["followers"],
1395 following_address: data["following"],
1396 bio: data["summary"] || "",
1397 actor_type: actor_type,
1398 also_known_as: Map.get(data, "alsoKnownAs", []),
1399 public_key: public_key,
1400 inbox: data["inbox"],
1401 shared_inbox: shared_inbox,
1402 accepts_chat_messages: accepts_chat_messages
1403 }
1404
1405 # nickname can be nil because of virtual actors
1406 if data["preferredUsername"] do
1407 Map.put(
1408 user_data,
1409 :nickname,
1410 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1411 )
1412 else
1413 Map.put(user_data, :nickname, nil)
1414 end
1415 end
1416
1417 def fetch_follow_information_for_user(user) do
1418 with {:ok, following_data} <-
1419 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1420 {:ok, hide_follows} <- collection_private(following_data),
1421 {:ok, followers_data} <-
1422 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1423 {:ok, hide_followers} <- collection_private(followers_data) do
1424 {:ok,
1425 %{
1426 hide_follows: hide_follows,
1427 follower_count: normalize_counter(followers_data["totalItems"]),
1428 following_count: normalize_counter(following_data["totalItems"]),
1429 hide_followers: hide_followers
1430 }}
1431 else
1432 {:error, _} = e -> e
1433 e -> {:error, e}
1434 end
1435 end
1436
1437 defp normalize_counter(counter) when is_integer(counter), do: counter
1438 defp normalize_counter(_), do: 0
1439
1440 def maybe_update_follow_information(user_data) do
1441 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1442 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1443 {_, true} <-
1444 {:collections_available,
1445 !!(user_data[:following_address] && user_data[:follower_address])},
1446 {:ok, info} <-
1447 fetch_follow_information_for_user(user_data) do
1448 info = Map.merge(user_data[:info] || %{}, info)
1449
1450 user_data
1451 |> Map.put(:info, info)
1452 else
1453 {:user_type_check, false} ->
1454 user_data
1455
1456 {:collections_available, false} ->
1457 user_data
1458
1459 {:enabled, false} ->
1460 user_data
1461
1462 e ->
1463 Logger.error(
1464 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1465 )
1466
1467 user_data
1468 end
1469 end
1470
1471 defp collection_private(%{"first" => %{"type" => type}})
1472 when type in ["CollectionPage", "OrderedCollectionPage"],
1473 do: {:ok, false}
1474
1475 defp collection_private(%{"first" => first}) do
1476 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1477 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1478 {:ok, false}
1479 else
1480 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1481 {:error, _} = e -> e
1482 e -> {:error, e}
1483 end
1484 end
1485
1486 defp collection_private(_data), do: {:ok, true}
1487
1488 def user_data_from_user_object(data) do
1489 with {:ok, data} <- MRF.filter(data) do
1490 {:ok, object_to_user_data(data)}
1491 else
1492 e -> {:error, e}
1493 end
1494 end
1495
1496 def fetch_and_prepare_user_from_ap_id(ap_id) do
1497 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1498 {:ok, data} <- user_data_from_user_object(data) do
1499 {:ok, maybe_update_follow_information(data)}
1500 else
1501 # If this has been deleted, only log a debug and not an error
1502 {:error, "Object has been deleted" = e} ->
1503 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1504 {:error, e}
1505
1506 {:error, {:reject, reason} = e} ->
1507 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1508 {:error, e}
1509
1510 {:error, e} ->
1511 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1512 {:error, e}
1513 end
1514 end
1515
1516 def maybe_handle_clashing_nickname(data) do
1517 with nickname when is_binary(nickname) <- data[:nickname],
1518 %User{} = old_user <- User.get_by_nickname(nickname),
1519 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1520 Logger.info(
1521 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1522 data[:ap_id]
1523 }, renaming."
1524 )
1525
1526 old_user
1527 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1528 |> User.update_and_set_cache()
1529 else
1530 {:ap_id_comparison, true} ->
1531 Logger.info(
1532 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1533 )
1534
1535 _ ->
1536 nil
1537 end
1538 end
1539
1540 def make_user_from_ap_id(ap_id) do
1541 user = User.get_cached_by_ap_id(ap_id)
1542
1543 if user && !User.ap_enabled?(user) do
1544 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1545 else
1546 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1547 if user do
1548 user
1549 |> User.remote_user_changeset(data)
1550 |> User.update_and_set_cache()
1551 else
1552 maybe_handle_clashing_nickname(data)
1553
1554 data
1555 |> User.remote_user_changeset()
1556 |> Repo.insert()
1557 |> User.set_cache()
1558 end
1559 end
1560 end
1561 end
1562
1563 def make_user_from_nickname(nickname) do
1564 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1565 make_user_from_ap_id(ap_id)
1566 else
1567 _e -> {:error, "No AP id in WebFinger"}
1568 end
1569 end
1570
1571 # filter out broken threads
1572 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1573 entire_thread_visible_for_user?(activity, user)
1574 end
1575
1576 # do post-processing on a specific activity
1577 def contain_activity(%Activity{} = activity, %User{} = user) do
1578 contain_broken_threads(activity, user)
1579 end
1580
1581 def fetch_direct_messages_query do
1582 Activity
1583 |> restrict_type(%{type: "Create"})
1584 |> restrict_visibility(%{visibility: "direct"})
1585 |> order_by([activity], asc: activity.id)
1586 end
1587 end