[#3213] ActivityPub: temporarily reverted to previous hashtags filtering implementati...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{}) do
595 params =
596 params
597 |> Map.put(:type, ["Create", "Announce"])
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600 |> Map.put(:pinned_activity_ids, user.pinned_activities)
601
602 params =
603 if User.blocks?(reading_user, user) do
604 params
605 else
606 params
607 |> Map.put(:blocking_user, reading_user)
608 |> Map.put(:muting_user, reading_user)
609 end
610
611 pagination_type = Map.get(params, :pagination_type) || :keyset
612
613 %{
614 godmode: params[:godmode],
615 reading_user: reading_user
616 }
617 |> user_activities_recipients()
618 |> fetch_activities(params, pagination_type)
619 |> Enum.reverse()
620 end
621
622 def fetch_statuses(reading_user, params) do
623 params = Map.put(params, :type, ["Create", "Announce"])
624
625 %{
626 godmode: params[:godmode],
627 reading_user: reading_user
628 }
629 |> user_activities_recipients()
630 |> fetch_activities(params, :offset)
631 |> Enum.reverse()
632 end
633
634 defp user_activities_recipients(%{godmode: true}), do: []
635
636 defp user_activities_recipients(%{reading_user: reading_user}) do
637 if reading_user do
638 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
639 else
640 [Constants.as_public()]
641 end
642 end
643
644 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
649 from(
650 [activity, object] in query,
651 where:
652 fragment(
653 "?->>'type' != ? or ?->>'actor' != ?",
654 activity.data,
655 "Announce",
656 object.data,
657 ^actor
658 )
659 )
660 end
661
662 defp restrict_announce_object_actor(query, _), do: query
663
664 defp restrict_since(query, %{since_id: ""}), do: query
665
666 defp restrict_since(query, %{since_id: since_id}) do
667 from(activity in query, where: activity.id > ^since_id)
668 end
669
670 defp restrict_since(query, _), do: query
671
672 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
673 raise_on_missing_preload()
674 end
675
676 defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
677 from(
678 [_activity, object] in query,
679 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
680 )
681 end
682
683 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
684 restrict_embedded_tag_any(query, %{tag: tag})
685 end
686
687 defp restrict_embedded_tag_all(query, _), do: query
688
689 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
690 raise_on_missing_preload()
691 end
692
693 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_list(tag) do
694 from(
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
697 )
698 end
699
700 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
701 restrict_embedded_tag_any(query, %{tag: [tag]})
702 end
703
704 defp restrict_embedded_tag_any(query, _), do: query
705
706 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
707 raise_on_missing_preload()
708 end
709
710 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
711 when is_list(tag_reject) do
712 from(
713 [_activity, object] in query,
714 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
715 )
716 end
717
718 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
719 when is_binary(tag_reject) do
720 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
721 end
722
723 defp restrict_embedded_tag_reject_any(query, _), do: query
724
725 # Groups by all bindings to allow aggregation on hashtags
726 defp group_by_all_bindings(query) do
727 # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
728 cond do
729 Enum.count(query.aliases) == 4 ->
730 from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
731
732 Enum.count(query.aliases) == 3 ->
733 from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
734
735 Enum.count(query.aliases) == 2 ->
736 from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
737
738 true ->
739 from([a, o] in query, group_by: [a.id, o.id])
740 end
741 end
742
743 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
744 raise_on_missing_preload()
745 end
746
747 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
748 query
749 |> group_by_all_bindings()
750 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
751 |> having(
752 [hashtag: hashtag],
753 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
754 )
755 end
756
757 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
758 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
759 end
760
761 defp restrict_hashtag_reject_any(query, _), do: query
762
763 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
764 raise_on_missing_preload()
765 end
766
767 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
768 Enum.reduce(
769 tags,
770 query,
771 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
772 )
773 end
774
775 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
776 restrict_hashtag_any(query, %{tag: tag})
777 end
778
779 defp restrict_hashtag_all(query, _), do: query
780
781 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
782 raise_on_missing_preload()
783 end
784
785 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
786 query =
787 from(
788 [_activity, object] in query,
789 join: hashtag in assoc(object, :hashtags),
790 where: hashtag.name in ^tags
791 )
792
793 if length(tags) > 1 do
794 distinct(query, [activity], true)
795 else
796 query
797 end
798 end
799
800 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
801 restrict_hashtag_any(query, %{tag: [tag]})
802 end
803
804 defp restrict_hashtag_any(query, _), do: query
805
806 defp raise_on_missing_preload do
807 raise "Can't use the child object without preloading!"
808 end
809
810 defp restrict_recipients(query, [], _user), do: query
811
812 defp restrict_recipients(query, recipients, nil) do
813 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
814 end
815
816 defp restrict_recipients(query, recipients, user) do
817 from(
818 activity in query,
819 where: fragment("? && ?", ^recipients, activity.recipients),
820 or_where: activity.actor == ^user.ap_id
821 )
822 end
823
824 defp restrict_local(query, %{local_only: true}) do
825 from(activity in query, where: activity.local == true)
826 end
827
828 defp restrict_local(query, _), do: query
829
830 defp restrict_actor(query, %{actor_id: actor_id}) do
831 from(activity in query, where: activity.actor == ^actor_id)
832 end
833
834 defp restrict_actor(query, _), do: query
835
836 defp restrict_type(query, %{type: type}) when is_binary(type) do
837 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
838 end
839
840 defp restrict_type(query, %{type: type}) do
841 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
842 end
843
844 defp restrict_type(query, _), do: query
845
846 defp restrict_state(query, %{state: state}) do
847 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
848 end
849
850 defp restrict_state(query, _), do: query
851
852 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
853 from(
854 [_activity, object] in query,
855 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
856 )
857 end
858
859 defp restrict_favorited_by(query, _), do: query
860
861 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
862 raise "Can't use the child object without preloading!"
863 end
864
865 defp restrict_media(query, %{only_media: true}) do
866 from(
867 [activity, object] in query,
868 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
869 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
870 )
871 end
872
873 defp restrict_media(query, _), do: query
874
875 defp restrict_replies(query, %{exclude_replies: true}) do
876 from(
877 [_activity, object] in query,
878 where: fragment("?->>'inReplyTo' is null", object.data)
879 )
880 end
881
882 defp restrict_replies(query, %{
883 reply_filtering_user: %User{} = user,
884 reply_visibility: "self"
885 }) do
886 from(
887 [activity, object] in query,
888 where:
889 fragment(
890 "?->>'inReplyTo' is null OR ? = ANY(?)",
891 object.data,
892 ^user.ap_id,
893 activity.recipients
894 )
895 )
896 end
897
898 defp restrict_replies(query, %{
899 reply_filtering_user: %User{} = user,
900 reply_visibility: "following"
901 }) do
902 from(
903 [activity, object] in query,
904 where:
905 fragment(
906 """
907 ?->>'type' != 'Create' -- This isn't a Create
908 OR ?->>'inReplyTo' is null -- this isn't a reply
909 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
910 -- unless they are the author (because authors
911 -- are also part of the recipients). This leads
912 -- to a bug that self-replies by friends won't
913 -- show up.
914 OR ? = ? -- The actor is us
915 """,
916 activity.data,
917 object.data,
918 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
919 activity.recipients,
920 activity.actor,
921 activity.actor,
922 ^user.ap_id
923 )
924 )
925 end
926
927 defp restrict_replies(query, _), do: query
928
929 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
930 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
931 end
932
933 defp restrict_reblogs(query, _), do: query
934
935 defp restrict_muted(query, %{with_muted: true}), do: query
936
937 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
938 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
939
940 query =
941 from([activity] in query,
942 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
943 where:
944 fragment(
945 "not (?->'to' \\?| ?) or ? = ?",
946 activity.data,
947 ^mutes,
948 activity.actor,
949 ^user.ap_id
950 )
951 )
952
953 unless opts[:skip_preload] do
954 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
955 else
956 query
957 end
958 end
959
960 defp restrict_muted(query, _), do: query
961
962 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
963 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
964 domain_blocks = user.domain_blocks || []
965
966 following_ap_ids = User.get_friends_ap_ids(user)
967
968 query =
969 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
970
971 from(
972 [activity, object: o] in query,
973 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
974 where:
975 fragment(
976 "((not (? && ?)) or ? = ?)",
977 activity.recipients,
978 ^blocked_ap_ids,
979 activity.actor,
980 ^user.ap_id
981 ),
982 where:
983 fragment(
984 "recipients_contain_blocked_domains(?, ?) = false",
985 activity.recipients,
986 ^domain_blocks
987 ),
988 where:
989 fragment(
990 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
991 activity.data,
992 activity.data,
993 ^blocked_ap_ids
994 ),
995 where:
996 fragment(
997 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
998 activity.actor,
999 ^domain_blocks,
1000 activity.actor,
1001 ^following_ap_ids
1002 ),
1003 where:
1004 fragment(
1005 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1006 o.data,
1007 ^domain_blocks,
1008 o.data,
1009 ^following_ap_ids
1010 )
1011 )
1012 end
1013
1014 defp restrict_blocked(query, _), do: query
1015
1016 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1017 from(
1018 activity in query,
1019 where:
1020 fragment(
1021 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1022 activity.data,
1023 ^[Constants.as_public()]
1024 )
1025 )
1026 end
1027
1028 defp restrict_unlisted(query, _), do: query
1029
1030 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1031 from(activity in query, where: activity.id in ^ids)
1032 end
1033
1034 defp restrict_pinned(query, _), do: query
1035
1036 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1037 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1038
1039 from(
1040 activity in query,
1041 where:
1042 fragment(
1043 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1044 activity.data,
1045 activity.actor,
1046 ^muted_reblogs
1047 )
1048 )
1049 end
1050
1051 defp restrict_muted_reblogs(query, _), do: query
1052
1053 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1054 from(
1055 activity in query,
1056 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1057 )
1058 end
1059
1060 defp restrict_instance(query, _), do: query
1061
1062 defp restrict_filtered(query, %{user: %User{} = user}) do
1063 case Filter.compose_regex(user) do
1064 nil ->
1065 query
1066
1067 regex ->
1068 from([activity, object] in query,
1069 where:
1070 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1071 activity.actor == ^user.ap_id
1072 )
1073 end
1074 end
1075
1076 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1077 restrict_filtered(query, %{user: user})
1078 end
1079
1080 defp restrict_filtered(query, _), do: query
1081
1082 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1083
1084 defp exclude_poll_votes(query, _) do
1085 if has_named_binding?(query, :object) do
1086 from([activity, object: o] in query,
1087 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1088 )
1089 else
1090 query
1091 end
1092 end
1093
1094 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1095
1096 defp exclude_chat_messages(query, _) do
1097 if has_named_binding?(query, :object) do
1098 from([activity, object: o] in query,
1099 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1100 )
1101 else
1102 query
1103 end
1104 end
1105
1106 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1107
1108 defp exclude_invisible_actors(query, _opts) do
1109 invisible_ap_ids =
1110 User.Query.build(%{invisible: true, select: [:ap_id]})
1111 |> Repo.all()
1112 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1113
1114 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1115 end
1116
1117 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1118 from(activity in query, where: activity.id != ^id)
1119 end
1120
1121 defp exclude_id(query, _), do: query
1122
1123 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1124
1125 defp maybe_preload_objects(query, _) do
1126 query
1127 |> Activity.with_preloaded_object()
1128 end
1129
1130 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1131
1132 defp maybe_preload_bookmarks(query, opts) do
1133 query
1134 |> Activity.with_preloaded_bookmark(opts[:user])
1135 end
1136
1137 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1138 query
1139 |> Activity.with_preloaded_report_notes()
1140 end
1141
1142 defp maybe_preload_report_notes(query, _), do: query
1143
1144 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1145
1146 defp maybe_set_thread_muted_field(query, opts) do
1147 query
1148 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1149 end
1150
1151 defp maybe_order(query, %{order: :desc}) do
1152 query
1153 |> order_by(desc: :id)
1154 end
1155
1156 defp maybe_order(query, %{order: :asc}) do
1157 query
1158 |> order_by(asc: :id)
1159 end
1160
1161 defp maybe_order(query, _), do: query
1162
1163 defp fetch_activities_query_ap_ids_ops(opts) do
1164 source_user = opts[:muting_user]
1165 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1166
1167 ap_id_relationships =
1168 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1169 [:block | ap_id_relationships]
1170 else
1171 ap_id_relationships
1172 end
1173
1174 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1175
1176 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1177 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1178
1179 restrict_muted_reblogs_opts =
1180 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1181
1182 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1183 end
1184
1185 def fetch_activities_query(recipients, opts \\ %{}) do
1186 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1187 fetch_activities_query_ap_ids_ops(opts)
1188
1189 config = %{
1190 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1191 }
1192
1193 query =
1194 Activity
1195 |> maybe_preload_objects(opts)
1196 |> maybe_preload_bookmarks(opts)
1197 |> maybe_preload_report_notes(opts)
1198 |> maybe_set_thread_muted_field(opts)
1199 |> maybe_order(opts)
1200 |> restrict_recipients(recipients, opts[:user])
1201 |> restrict_replies(opts)
1202 |> restrict_since(opts)
1203 |> restrict_local(opts)
1204 |> restrict_actor(opts)
1205 |> restrict_type(opts)
1206 |> restrict_state(opts)
1207 |> restrict_favorited_by(opts)
1208 |> restrict_blocked(restrict_blocked_opts)
1209 |> restrict_muted(restrict_muted_opts)
1210 |> restrict_filtered(opts)
1211 |> restrict_media(opts)
1212 |> restrict_visibility(opts)
1213 |> restrict_thread_visibility(opts, config)
1214 |> restrict_reblogs(opts)
1215 |> restrict_pinned(opts)
1216 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1217 |> restrict_instance(opts)
1218 |> restrict_announce_object_actor(opts)
1219 |> restrict_filtered(opts)
1220 |> Activity.restrict_deactivated_users()
1221 |> exclude_poll_votes(opts)
1222 |> exclude_chat_messages(opts)
1223 |> exclude_invisible_actors(opts)
1224 |> exclude_visibility(opts)
1225
1226 if Config.improved_hashtag_timeline() do
1227 query
1228 |> restrict_hashtag_any(opts)
1229 |> restrict_hashtag_all(opts)
1230 |> restrict_hashtag_reject_any(opts)
1231 else
1232 query
1233 |> restrict_embedded_tag_any(opts)
1234 |> restrict_embedded_tag_all(opts)
1235 |> restrict_embedded_tag_reject_any(opts)
1236 end
1237 end
1238
1239 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1240 list_memberships = Pleroma.List.memberships(opts[:user])
1241
1242 fetch_activities_query(recipients ++ list_memberships, opts)
1243 |> Pagination.fetch_paginated(opts, pagination)
1244 |> Enum.reverse()
1245 |> maybe_update_cc(list_memberships, opts[:user])
1246 end
1247
1248 @doc """
1249 Fetch favorites activities of user with order by sort adds to favorites
1250 """
1251 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1252 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1253 user.ap_id
1254 |> Activity.Queries.by_actor()
1255 |> Activity.Queries.by_type("Like")
1256 |> Activity.with_joined_object()
1257 |> Object.with_joined_activity()
1258 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1259 |> order_by([like, _, _], desc_nulls_last: like.id)
1260 |> Pagination.fetch_paginated(
1261 Map.merge(params, %{skip_order: true}),
1262 pagination
1263 )
1264 end
1265
1266 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1267 Enum.map(activities, fn
1268 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1269 if Enum.any?(bcc, &(&1 in list_memberships)) do
1270 update_in(activity.data["cc"], &[user_ap_id | &1])
1271 else
1272 activity
1273 end
1274
1275 activity ->
1276 activity
1277 end)
1278 end
1279
1280 defp maybe_update_cc(activities, _, _), do: activities
1281
1282 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1283 from(activity in query,
1284 where:
1285 fragment("? && ?", activity.recipients, ^recipients) or
1286 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1287 ^Constants.as_public() in activity.recipients)
1288 )
1289 end
1290
1291 def fetch_activities_bounded(
1292 recipients,
1293 recipients_with_public,
1294 opts \\ %{},
1295 pagination \\ :keyset
1296 ) do
1297 fetch_activities_query([], opts)
1298 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1299 |> Pagination.fetch_paginated(opts, pagination)
1300 |> Enum.reverse()
1301 end
1302
1303 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1304 def upload(file, opts \\ []) do
1305 with {:ok, data} <- Upload.store(file, opts) do
1306 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1307
1308 Repo.insert(%Object{data: obj_data})
1309 end
1310 end
1311
1312 @spec get_actor_url(any()) :: binary() | nil
1313 defp get_actor_url(url) when is_binary(url), do: url
1314 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1315
1316 defp get_actor_url(url) when is_list(url) do
1317 url
1318 |> List.first()
1319 |> get_actor_url()
1320 end
1321
1322 defp get_actor_url(_url), do: nil
1323
1324 defp object_to_user_data(data) do
1325 avatar =
1326 data["icon"]["url"] &&
1327 %{
1328 "type" => "Image",
1329 "url" => [%{"href" => data["icon"]["url"]}]
1330 }
1331
1332 banner =
1333 data["image"]["url"] &&
1334 %{
1335 "type" => "Image",
1336 "url" => [%{"href" => data["image"]["url"]}]
1337 }
1338
1339 fields =
1340 data
1341 |> Map.get("attachment", [])
1342 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1343 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1344
1345 emojis =
1346 data
1347 |> Map.get("tag", [])
1348 |> Enum.filter(fn
1349 %{"type" => "Emoji"} -> true
1350 _ -> false
1351 end)
1352 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1353 {String.trim(name, ":"), url}
1354 end)
1355
1356 is_locked = data["manuallyApprovesFollowers"] || false
1357 capabilities = data["capabilities"] || %{}
1358 accepts_chat_messages = capabilities["acceptsChatMessages"]
1359 data = Transmogrifier.maybe_fix_user_object(data)
1360 is_discoverable = data["discoverable"] || false
1361 invisible = data["invisible"] || false
1362 actor_type = data["type"] || "Person"
1363
1364 public_key =
1365 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1366 data["publicKey"]["publicKeyPem"]
1367 else
1368 nil
1369 end
1370
1371 shared_inbox =
1372 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1373 data["endpoints"]["sharedInbox"]
1374 else
1375 nil
1376 end
1377
1378 user_data = %{
1379 ap_id: data["id"],
1380 uri: get_actor_url(data["url"]),
1381 ap_enabled: true,
1382 banner: banner,
1383 fields: fields,
1384 emoji: emojis,
1385 is_locked: is_locked,
1386 is_discoverable: is_discoverable,
1387 invisible: invisible,
1388 avatar: avatar,
1389 name: data["name"],
1390 follower_address: data["followers"],
1391 following_address: data["following"],
1392 bio: data["summary"] || "",
1393 actor_type: actor_type,
1394 also_known_as: Map.get(data, "alsoKnownAs", []),
1395 public_key: public_key,
1396 inbox: data["inbox"],
1397 shared_inbox: shared_inbox,
1398 accepts_chat_messages: accepts_chat_messages
1399 }
1400
1401 # nickname can be nil because of virtual actors
1402 if data["preferredUsername"] do
1403 Map.put(
1404 user_data,
1405 :nickname,
1406 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1407 )
1408 else
1409 Map.put(user_data, :nickname, nil)
1410 end
1411 end
1412
1413 def fetch_follow_information_for_user(user) do
1414 with {:ok, following_data} <-
1415 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1416 {:ok, hide_follows} <- collection_private(following_data),
1417 {:ok, followers_data} <-
1418 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1419 {:ok, hide_followers} <- collection_private(followers_data) do
1420 {:ok,
1421 %{
1422 hide_follows: hide_follows,
1423 follower_count: normalize_counter(followers_data["totalItems"]),
1424 following_count: normalize_counter(following_data["totalItems"]),
1425 hide_followers: hide_followers
1426 }}
1427 else
1428 {:error, _} = e -> e
1429 e -> {:error, e}
1430 end
1431 end
1432
1433 defp normalize_counter(counter) when is_integer(counter), do: counter
1434 defp normalize_counter(_), do: 0
1435
1436 def maybe_update_follow_information(user_data) do
1437 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1438 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1439 {_, true} <-
1440 {:collections_available,
1441 !!(user_data[:following_address] && user_data[:follower_address])},
1442 {:ok, info} <-
1443 fetch_follow_information_for_user(user_data) do
1444 info = Map.merge(user_data[:info] || %{}, info)
1445
1446 user_data
1447 |> Map.put(:info, info)
1448 else
1449 {:user_type_check, false} ->
1450 user_data
1451
1452 {:collections_available, false} ->
1453 user_data
1454
1455 {:enabled, false} ->
1456 user_data
1457
1458 e ->
1459 Logger.error(
1460 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1461 )
1462
1463 user_data
1464 end
1465 end
1466
1467 defp collection_private(%{"first" => %{"type" => type}})
1468 when type in ["CollectionPage", "OrderedCollectionPage"],
1469 do: {:ok, false}
1470
1471 defp collection_private(%{"first" => first}) do
1472 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1473 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1474 {:ok, false}
1475 else
1476 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1477 {:error, _} = e -> e
1478 e -> {:error, e}
1479 end
1480 end
1481
1482 defp collection_private(_data), do: {:ok, true}
1483
1484 def user_data_from_user_object(data) do
1485 with {:ok, data} <- MRF.filter(data) do
1486 {:ok, object_to_user_data(data)}
1487 else
1488 e -> {:error, e}
1489 end
1490 end
1491
1492 def fetch_and_prepare_user_from_ap_id(ap_id) do
1493 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1494 {:ok, data} <- user_data_from_user_object(data) do
1495 {:ok, maybe_update_follow_information(data)}
1496 else
1497 # If this has been deleted, only log a debug and not an error
1498 {:error, "Object has been deleted" = e} ->
1499 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1500 {:error, e}
1501
1502 {:error, {:reject, reason} = e} ->
1503 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1504 {:error, e}
1505
1506 {:error, e} ->
1507 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1508 {:error, e}
1509 end
1510 end
1511
1512 def maybe_handle_clashing_nickname(data) do
1513 with nickname when is_binary(nickname) <- data[:nickname],
1514 %User{} = old_user <- User.get_by_nickname(nickname),
1515 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1516 Logger.info(
1517 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1518 data[:ap_id]
1519 }, renaming."
1520 )
1521
1522 old_user
1523 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1524 |> User.update_and_set_cache()
1525 else
1526 {:ap_id_comparison, true} ->
1527 Logger.info(
1528 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1529 )
1530
1531 _ ->
1532 nil
1533 end
1534 end
1535
1536 def make_user_from_ap_id(ap_id) do
1537 user = User.get_cached_by_ap_id(ap_id)
1538
1539 if user && !User.ap_enabled?(user) do
1540 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1541 else
1542 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1543 if user do
1544 user
1545 |> User.remote_user_changeset(data)
1546 |> User.update_and_set_cache()
1547 else
1548 maybe_handle_clashing_nickname(data)
1549
1550 data
1551 |> User.remote_user_changeset()
1552 |> Repo.insert()
1553 |> User.set_cache()
1554 end
1555 end
1556 end
1557 end
1558
1559 def make_user_from_nickname(nickname) do
1560 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1561 make_user_from_ap_id(ap_id)
1562 else
1563 _e -> {:error, "No AP id in WebFinger"}
1564 end
1565 end
1566
1567 # filter out broken threads
1568 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1569 entire_thread_visible_for_user?(activity, user)
1570 end
1571
1572 # do post-processing on a specific activity
1573 def contain_activity(%Activity{} = activity, %User{} = user) do
1574 contain_broken_threads(activity, user)
1575 end
1576
1577 def fetch_direct_messages_query do
1578 Activity
1579 |> restrict_type(%{type: "Create"})
1580 |> restrict_visibility(%{visibility: "direct"})
1581 |> order_by([activity], asc: activity.id)
1582 end
1583 end