2e25412c6ba7cd4e723da8ca0eeae94bc588b425
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
91 {:ok, object, meta}
92 end
93 end
94
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
98 {:ok, activity} <-
99 Repo.insert(%Activity{
100 data: object,
101 local: local,
102 recipients: recipients,
103 actor: object["actor"]
104 }),
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
108 end
109 end
110
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
125
126 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
127 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
128 end)
129
130 {:ok, activity}
131 else
132 %Activity{} = activity ->
133 {:ok, activity}
134
135 {:actor_check, _} ->
136 {:error, false}
137
138 {:containment, _} = error ->
139 error
140
141 {:error, _} = error ->
142 error
143
144 {:fake, true, map, recipients} ->
145 activity = %Activity{
146 data: map,
147 local: local,
148 actor: map["actor"],
149 recipients: recipients,
150 id: "pleroma:fakeid"
151 }
152
153 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
154 {:ok, activity}
155
156 {:remote_limit_pass, _} ->
157 {:error, :remote_limit}
158
159 {:reject, _} = e ->
160 {:error, e}
161 end
162 end
163
164 defp insert_activity_with_expiration(data, local, recipients) do
165 struct = %Activity{
166 data: data,
167 local: local,
168 actor: data["actor"],
169 recipients: recipients
170 }
171
172 with {:ok, activity} <- Repo.insert(struct) do
173 maybe_create_activity_expiration(activity)
174 end
175 end
176
177 def notify_and_stream(activity) do
178 Notification.create_notifications(activity)
179
180 conversation = create_or_bump_conversation(activity, activity.actor)
181 participations = get_participations(conversation)
182 stream_out(activity)
183 stream_out_participations(participations)
184 end
185
186 defp maybe_create_activity_expiration(
187 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
188 ) do
189 with {:ok, _job} <-
190 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
191 activity_id: activity.id,
192 expires_at: expires_at
193 }) do
194 {:ok, activity}
195 end
196 end
197
198 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
199
200 defp create_or_bump_conversation(activity, actor) do
201 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
202 %User{} = user <- User.get_cached_by_ap_id(actor) do
203 Participation.mark_as_read(user, conversation)
204 {:ok, conversation}
205 end
206 end
207
208 defp get_participations({:ok, conversation}) do
209 conversation
210 |> Repo.preload(:participations, force: true)
211 |> Map.get(:participations)
212 end
213
214 defp get_participations(_), do: []
215
216 def stream_out_participations(participations) do
217 participations =
218 participations
219 |> Repo.preload(:user)
220
221 Streamer.stream("participation", participations)
222 end
223
224 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
225 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
226 conversation = Repo.preload(conversation, :participations)
227
228 last_activity_id =
229 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
230 user: user,
231 blocking_user: user
232 })
233
234 if last_activity_id do
235 stream_out_participations(conversation.participations)
236 end
237 end
238 end
239
240 def stream_out_participations(_, _), do: :noop
241
242 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
243 when data_type in ["Create", "Announce", "Delete"] do
244 activity
245 |> Topics.get_activity_topics()
246 |> Streamer.stream(activity)
247 end
248
249 def stream_out(_activity) do
250 :noop
251 end
252
253 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
254 def create(params, fake \\ false) do
255 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
256 result
257 end
258 end
259
260 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
261 additional = params[:additional] || %{}
262 # only accept false as false value
263 local = !(params[:local] == false)
264 published = params[:published]
265 quick_insert? = Config.get([:env]) == :benchmark
266
267 create_data =
268 make_create_data(
269 %{to: to, actor: actor, published: published, context: context, object: object},
270 additional
271 )
272
273 with {:ok, activity} <- insert(create_data, local, fake),
274 {:fake, false, activity} <- {:fake, fake, activity},
275 _ <- increase_replies_count_if_reply(create_data),
276 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
277 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
278 _ <- notify_and_stream(activity),
279 :ok <- maybe_federate(activity) do
280 {:ok, activity}
281 else
282 {:quick_insert, true, activity} ->
283 {:ok, activity}
284
285 {:fake, true, activity} ->
286 {:ok, activity}
287
288 {:error, message} ->
289 Repo.rollback(message)
290 end
291 end
292
293 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
294 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
295 additional = params[:additional] || %{}
296 # only accept false as false value
297 local = !(params[:local] == false)
298 published = params[:published]
299
300 listen_data =
301 make_listen_data(
302 %{to: to, actor: actor, published: published, context: context, object: object},
303 additional
304 )
305
306 with {:ok, activity} <- insert(listen_data, local),
307 _ <- notify_and_stream(activity),
308 :ok <- maybe_federate(activity) do
309 {:ok, activity}
310 end
311 end
312
313 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
314 {:ok, Activity.t()} | nil | {:error, any()}
315 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
316 with {:ok, result} <-
317 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
318 result
319 end
320 end
321
322 defp do_unfollow(follower, followed, activity_id, local) do
323 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
324 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
325 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
326 {:ok, activity} <- insert(unfollow_data, local),
327 _ <- notify_and_stream(activity),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity}
330 else
331 nil -> nil
332 {:error, error} -> Repo.rollback(error)
333 end
334 end
335
336 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
337 def flag(params) do
338 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
339 result
340 end
341 end
342
343 defp do_flag(
344 %{
345 actor: actor,
346 context: _context,
347 account: account,
348 statuses: statuses,
349 content: content
350 } = params
351 ) do
352 # only accept false as false value
353 local = !(params[:local] == false)
354 forward = !(params[:forward] == false)
355
356 additional = params[:additional] || %{}
357
358 additional =
359 if forward do
360 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
361 else
362 Map.merge(additional, %{"to" => [], "cc" => []})
363 end
364
365 with flag_data <- make_flag_data(params, additional),
366 {:ok, activity} <- insert(flag_data, local),
367 {:ok, stripped_activity} <- strip_report_status_data(activity),
368 _ <- notify_and_stream(activity),
369 :ok <-
370 maybe_federate(stripped_activity) do
371 User.all_superusers()
372 |> Enum.filter(fn user -> not is_nil(user.email) end)
373 |> Enum.each(fn superuser ->
374 superuser
375 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
376 |> Pleroma.Emails.Mailer.deliver_async()
377 end)
378
379 {:ok, activity}
380 else
381 {:error, error} -> Repo.rollback(error)
382 end
383 end
384
385 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
386 def move(%User{} = origin, %User{} = target, local \\ true) do
387 params = %{
388 "type" => "Move",
389 "actor" => origin.ap_id,
390 "object" => origin.ap_id,
391 "target" => target.ap_id
392 }
393
394 with true <- origin.ap_id in target.also_known_as,
395 {:ok, activity} <- insert(params, local),
396 _ <- notify_and_stream(activity) do
397 maybe_federate(activity)
398
399 BackgroundWorker.enqueue("move_following", %{
400 "origin_id" => origin.id,
401 "target_id" => target.id
402 })
403
404 {:ok, activity}
405 else
406 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
407 err -> err
408 end
409 end
410
411 def fetch_activities_for_context_query(context, opts) do
412 public = [Constants.as_public()]
413
414 recipients =
415 if opts[:user],
416 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
417 else: public
418
419 from(activity in Activity)
420 |> maybe_preload_objects(opts)
421 |> maybe_preload_bookmarks(opts)
422 |> maybe_set_thread_muted_field(opts)
423 |> restrict_blocked(opts)
424 |> restrict_recipients(recipients, opts[:user])
425 |> restrict_filtered(opts)
426 |> where(
427 [activity],
428 fragment(
429 "?->>'type' = ? and ?->>'context' = ?",
430 activity.data,
431 "Create",
432 activity.data,
433 ^context
434 )
435 )
436 |> exclude_poll_votes(opts)
437 |> exclude_id(opts)
438 |> order_by([activity], desc: activity.id)
439 end
440
441 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
442 def fetch_activities_for_context(context, opts \\ %{}) do
443 context
444 |> fetch_activities_for_context_query(opts)
445 |> Repo.all()
446 end
447
448 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
449 FlakeId.Ecto.CompatType.t() | nil
450 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
453 |> restrict_visibility(%{visibility: "direct"})
454 |> limit(1)
455 |> select([a], a.id)
456 |> Repo.one()
457 end
458
459 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
460 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
461 opts = Map.delete(opts, :user)
462
463 [Constants.as_public()]
464 |> fetch_activities_query(opts)
465 |> restrict_unlisted(opts)
466 |> Pagination.fetch_paginated(opts, pagination)
467 end
468
469 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
470 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
471 opts
472 |> Map.put(:restrict_unlisted, true)
473 |> fetch_public_or_unlisted_activities(pagination)
474 end
475
476 @valid_visibilities ~w[direct unlisted public private]
477
478 defp restrict_visibility(query, %{visibility: visibility})
479 when is_list(visibility) do
480 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
481 from(
482 a in query,
483 where:
484 fragment(
485 "activity_visibility(?, ?, ?) = ANY (?)",
486 a.actor,
487 a.recipients,
488 a.data,
489 ^visibility
490 )
491 )
492 else
493 Logger.error("Could not restrict visibility to #{visibility}")
494 end
495 end
496
497 defp restrict_visibility(query, %{visibility: visibility})
498 when visibility in @valid_visibilities do
499 from(
500 a in query,
501 where:
502 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
503 )
504 end
505
506 defp restrict_visibility(_query, %{visibility: visibility})
507 when visibility not in @valid_visibilities do
508 Logger.error("Could not restrict visibility to #{visibility}")
509 end
510
511 defp restrict_visibility(query, _visibility), do: query
512
513 defp exclude_visibility(query, %{exclude_visibilities: visibility})
514 when is_list(visibility) do
515 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
516 from(
517 a in query,
518 where:
519 not fragment(
520 "activity_visibility(?, ?, ?) = ANY (?)",
521 a.actor,
522 a.recipients,
523 a.data,
524 ^visibility
525 )
526 )
527 else
528 Logger.error("Could not exclude visibility to #{visibility}")
529 query
530 end
531 end
532
533 defp exclude_visibility(query, %{exclude_visibilities: visibility})
534 when visibility in @valid_visibilities do
535 from(
536 a in query,
537 where:
538 not fragment(
539 "activity_visibility(?, ?, ?) = ?",
540 a.actor,
541 a.recipients,
542 a.data,
543 ^visibility
544 )
545 )
546 end
547
548 defp exclude_visibility(query, %{exclude_visibilities: visibility})
549 when visibility not in [nil | @valid_visibilities] do
550 Logger.error("Could not exclude visibility to #{visibility}")
551 query
552 end
553
554 defp exclude_visibility(query, _visibility), do: query
555
556 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
557 do: query
558
559 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
563 from(
564 a in query,
565 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
566 )
567 end
568
569 defp restrict_thread_visibility(query, _, _), do: query
570
571 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
572 params =
573 params
574 |> Map.put(:user, reading_user)
575 |> Map.put(:actor_id, user.ap_id)
576
577 %{
578 godmode: params[:godmode],
579 reading_user: reading_user
580 }
581 |> user_activities_recipients()
582 |> fetch_activities(params)
583 |> Enum.reverse()
584 end
585
586 def fetch_user_activities(user, reading_user, params \\ %{}) do
587 params =
588 params
589 |> Map.put(:type, ["Create", "Announce"])
590 |> Map.put(:user, reading_user)
591 |> Map.put(:actor_id, user.ap_id)
592 |> Map.put(:pinned_activity_ids, user.pinned_activities)
593
594 params =
595 if User.blocks?(reading_user, user) do
596 params
597 else
598 params
599 |> Map.put(:blocking_user, reading_user)
600 |> Map.put(:muting_user, reading_user)
601 end
602
603 %{
604 godmode: params[:godmode],
605 reading_user: reading_user
606 }
607 |> user_activities_recipients()
608 |> fetch_activities(params)
609 |> Enum.reverse()
610 end
611
612 def fetch_statuses(reading_user, params) do
613 params = Map.put(params, :type, ["Create", "Announce"])
614
615 %{
616 godmode: params[:godmode],
617 reading_user: reading_user
618 }
619 |> user_activities_recipients()
620 |> fetch_activities(params, :offset)
621 |> Enum.reverse()
622 end
623
624 defp user_activities_recipients(%{godmode: true}), do: []
625
626 defp user_activities_recipients(%{reading_user: reading_user}) do
627 if reading_user do
628 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
629 else
630 [Constants.as_public()]
631 end
632 end
633
634 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
635 raise "Can't use the child object without preloading!"
636 end
637
638 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
639 from(
640 [activity, object] in query,
641 where:
642 fragment(
643 "?->>'type' != ? or ?->>'actor' != ?",
644 activity.data,
645 "Announce",
646 object.data,
647 ^actor
648 )
649 )
650 end
651
652 defp restrict_announce_object_actor(query, _), do: query
653
654 defp restrict_since(query, %{since_id: ""}), do: query
655
656 defp restrict_since(query, %{since_id: since_id}) do
657 from(activity in query, where: activity.id > ^since_id)
658 end
659
660 defp restrict_since(query, _), do: query
661
662 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
663 raise_on_missing_preload()
664 end
665
666 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
667 from(
668 [_activity, object] in query,
669 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
670 )
671 end
672
673 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
674 restrict_tag_reject(query, %{tag_reject: [tag_reject]})
675 end
676
677 defp restrict_tag_reject(query, _), do: query
678
679 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
680 raise_on_missing_preload()
681 end
682
683 defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
684 from(
685 [_activity, object] in query,
686 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
687 )
688 end
689
690 defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
691 restrict_tag(query, %{tag: tag})
692 end
693
694 defp restrict_tag_all(query, _), do: query
695
696 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
697 raise_on_missing_preload()
698 end
699
700 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
701 from(
702 [_activity, object] in query,
703 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
704 )
705 end
706
707 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
708 restrict_tag(query, %{tag: [tag]})
709 end
710
711 defp restrict_tag(query, _), do: query
712
713 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
714 raise_on_missing_preload()
715 end
716
717 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
718 if has_named_binding?(query, :thread_mute) do
719 from(
720 [activity, object, thread_mute] in query,
721 group_by: [activity.id, object.id, thread_mute.id]
722 )
723 else
724 from(
725 [activity, object] in query,
726 group_by: [activity.id, object.id]
727 )
728 end
729 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
730 |> having(
731 [hashtag: hashtag],
732 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
733 )
734 end
735
736 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
737 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
738 end
739
740 defp restrict_hashtag_reject_any(query, _), do: query
741
742 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
743 raise_on_missing_preload()
744 end
745
746 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
747 Enum.reduce(
748 tags,
749 query,
750 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
751 )
752 end
753
754 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
755 restrict_hashtag_any(query, %{tag: tag})
756 end
757
758 defp restrict_hashtag_all(query, _), do: query
759
760 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
761 raise_on_missing_preload()
762 end
763
764 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
765 from(
766 [_activity, object] in query,
767 join: hashtag in assoc(object, :hashtags),
768 where: hashtag.name in ^tags
769 )
770 end
771
772 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
773 restrict_hashtag_any(query, %{tag: [tag]})
774 end
775
776 defp restrict_hashtag_any(query, _), do: query
777
778 defp raise_on_missing_preload do
779 raise "Can't use the child object without preloading!"
780 end
781
782 defp restrict_recipients(query, [], _user), do: query
783
784 defp restrict_recipients(query, recipients, nil) do
785 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
786 end
787
788 defp restrict_recipients(query, recipients, user) do
789 from(
790 activity in query,
791 where: fragment("? && ?", ^recipients, activity.recipients),
792 or_where: activity.actor == ^user.ap_id
793 )
794 end
795
796 defp restrict_local(query, %{local_only: true}) do
797 from(activity in query, where: activity.local == true)
798 end
799
800 defp restrict_local(query, _), do: query
801
802 defp restrict_actor(query, %{actor_id: actor_id}) do
803 from(activity in query, where: activity.actor == ^actor_id)
804 end
805
806 defp restrict_actor(query, _), do: query
807
808 defp restrict_type(query, %{type: type}) when is_binary(type) do
809 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
810 end
811
812 defp restrict_type(query, %{type: type}) do
813 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
814 end
815
816 defp restrict_type(query, _), do: query
817
818 defp restrict_state(query, %{state: state}) do
819 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
820 end
821
822 defp restrict_state(query, _), do: query
823
824 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
825 from(
826 [_activity, object] in query,
827 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
828 )
829 end
830
831 defp restrict_favorited_by(query, _), do: query
832
833 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
834 raise "Can't use the child object without preloading!"
835 end
836
837 defp restrict_media(query, %{only_media: true}) do
838 from(
839 [activity, object] in query,
840 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
841 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
842 )
843 end
844
845 defp restrict_media(query, _), do: query
846
847 defp restrict_replies(query, %{exclude_replies: true}) do
848 from(
849 [_activity, object] in query,
850 where: fragment("?->>'inReplyTo' is null", object.data)
851 )
852 end
853
854 defp restrict_replies(query, %{
855 reply_filtering_user: %User{} = user,
856 reply_visibility: "self"
857 }) do
858 from(
859 [activity, object] in query,
860 where:
861 fragment(
862 "?->>'inReplyTo' is null OR ? = ANY(?)",
863 object.data,
864 ^user.ap_id,
865 activity.recipients
866 )
867 )
868 end
869
870 defp restrict_replies(query, %{
871 reply_filtering_user: %User{} = user,
872 reply_visibility: "following"
873 }) do
874 from(
875 [activity, object] in query,
876 where:
877 fragment(
878 """
879 ?->>'type' != 'Create' -- This isn't a Create
880 OR ?->>'inReplyTo' is null -- this isn't a reply
881 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
882 -- unless they are the author (because authors
883 -- are also part of the recipients). This leads
884 -- to a bug that self-replies by friends won't
885 -- show up.
886 OR ? = ? -- The actor is us
887 """,
888 activity.data,
889 object.data,
890 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
891 activity.recipients,
892 activity.actor,
893 activity.actor,
894 ^user.ap_id
895 )
896 )
897 end
898
899 defp restrict_replies(query, _), do: query
900
901 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
902 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
903 end
904
905 defp restrict_reblogs(query, _), do: query
906
907 defp restrict_muted(query, %{with_muted: true}), do: query
908
909 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
910 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
911
912 query =
913 from([activity] in query,
914 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
915 where:
916 fragment(
917 "not (?->'to' \\?| ?) or ? = ?",
918 activity.data,
919 ^mutes,
920 activity.actor,
921 ^user.ap_id
922 )
923 )
924
925 unless opts[:skip_preload] do
926 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
927 else
928 query
929 end
930 end
931
932 defp restrict_muted(query, _), do: query
933
934 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
935 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
936 domain_blocks = user.domain_blocks || []
937
938 following_ap_ids = User.get_friends_ap_ids(user)
939
940 query =
941 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
942
943 from(
944 [activity, object: o] in query,
945 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
946 where:
947 fragment(
948 "((not (? && ?)) or ? = ?)",
949 activity.recipients,
950 ^blocked_ap_ids,
951 activity.actor,
952 ^user.ap_id
953 ),
954 where:
955 fragment(
956 "recipients_contain_blocked_domains(?, ?) = false",
957 activity.recipients,
958 ^domain_blocks
959 ),
960 where:
961 fragment(
962 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
963 activity.data,
964 activity.data,
965 ^blocked_ap_ids
966 ),
967 where:
968 fragment(
969 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
970 activity.actor,
971 ^domain_blocks,
972 activity.actor,
973 ^following_ap_ids
974 ),
975 where:
976 fragment(
977 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
978 o.data,
979 ^domain_blocks,
980 o.data,
981 ^following_ap_ids
982 )
983 )
984 end
985
986 defp restrict_blocked(query, _), do: query
987
988 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
989 from(
990 activity in query,
991 where:
992 fragment(
993 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
994 activity.data,
995 ^[Constants.as_public()]
996 )
997 )
998 end
999
1000 defp restrict_unlisted(query, _), do: query
1001
1002 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1003 from(activity in query, where: activity.id in ^ids)
1004 end
1005
1006 defp restrict_pinned(query, _), do: query
1007
1008 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1009 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1010
1011 from(
1012 activity in query,
1013 where:
1014 fragment(
1015 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1016 activity.data,
1017 activity.actor,
1018 ^muted_reblogs
1019 )
1020 )
1021 end
1022
1023 defp restrict_muted_reblogs(query, _), do: query
1024
1025 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1026 from(
1027 activity in query,
1028 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1029 )
1030 end
1031
1032 defp restrict_instance(query, _), do: query
1033
1034 defp restrict_filtered(query, %{user: %User{} = user}) do
1035 case Filter.compose_regex(user) do
1036 nil ->
1037 query
1038
1039 regex ->
1040 from([activity, object] in query,
1041 where:
1042 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1043 activity.actor == ^user.ap_id
1044 )
1045 end
1046 end
1047
1048 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1049 restrict_filtered(query, %{user: user})
1050 end
1051
1052 defp restrict_filtered(query, _), do: query
1053
1054 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1055
1056 defp exclude_poll_votes(query, _) do
1057 if has_named_binding?(query, :object) do
1058 from([activity, object: o] in query,
1059 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1060 )
1061 else
1062 query
1063 end
1064 end
1065
1066 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1067
1068 defp exclude_chat_messages(query, _) do
1069 if has_named_binding?(query, :object) do
1070 from([activity, object: o] in query,
1071 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1072 )
1073 else
1074 query
1075 end
1076 end
1077
1078 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1079
1080 defp exclude_invisible_actors(query, _opts) do
1081 invisible_ap_ids =
1082 User.Query.build(%{invisible: true, select: [:ap_id]})
1083 |> Repo.all()
1084 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1085
1086 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1087 end
1088
1089 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1090 from(activity in query, where: activity.id != ^id)
1091 end
1092
1093 defp exclude_id(query, _), do: query
1094
1095 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1096
1097 defp maybe_preload_objects(query, _) do
1098 query
1099 |> Activity.with_preloaded_object()
1100 end
1101
1102 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1103
1104 defp maybe_preload_bookmarks(query, opts) do
1105 query
1106 |> Activity.with_preloaded_bookmark(opts[:user])
1107 end
1108
1109 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1110 query
1111 |> Activity.with_preloaded_report_notes()
1112 end
1113
1114 defp maybe_preload_report_notes(query, _), do: query
1115
1116 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1117
1118 defp maybe_set_thread_muted_field(query, opts) do
1119 query
1120 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1121 end
1122
1123 defp maybe_order(query, %{order: :desc}) do
1124 query
1125 |> order_by(desc: :id)
1126 end
1127
1128 defp maybe_order(query, %{order: :asc}) do
1129 query
1130 |> order_by(asc: :id)
1131 end
1132
1133 defp maybe_order(query, _), do: query
1134
1135 defp fetch_activities_query_ap_ids_ops(opts) do
1136 source_user = opts[:muting_user]
1137 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1138
1139 ap_id_relationships =
1140 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1141 [:block | ap_id_relationships]
1142 else
1143 ap_id_relationships
1144 end
1145
1146 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1147
1148 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1149 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1150
1151 restrict_muted_reblogs_opts =
1152 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1153
1154 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1155 end
1156
1157 def fetch_activities_query(recipients, opts \\ %{}) do
1158 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1159 fetch_activities_query_ap_ids_ops(opts)
1160
1161 config = %{
1162 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1163 }
1164
1165 query =
1166 Activity
1167 |> distinct([a], true)
1168 |> maybe_preload_objects(opts)
1169 |> maybe_preload_bookmarks(opts)
1170 |> maybe_preload_report_notes(opts)
1171 |> maybe_set_thread_muted_field(opts)
1172 |> maybe_order(opts)
1173 |> restrict_recipients(recipients, opts[:user])
1174 |> restrict_replies(opts)
1175 |> restrict_since(opts)
1176 |> restrict_local(opts)
1177 |> restrict_actor(opts)
1178 |> restrict_type(opts)
1179 |> restrict_state(opts)
1180 |> restrict_favorited_by(opts)
1181 |> restrict_blocked(restrict_blocked_opts)
1182 |> restrict_muted(restrict_muted_opts)
1183 |> restrict_filtered(opts)
1184 |> restrict_media(opts)
1185 |> restrict_visibility(opts)
1186 |> restrict_thread_visibility(opts, config)
1187 |> restrict_reblogs(opts)
1188 |> restrict_pinned(opts)
1189 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1190 |> restrict_instance(opts)
1191 |> restrict_announce_object_actor(opts)
1192 |> restrict_filtered(opts)
1193 |> Activity.restrict_deactivated_users()
1194 |> exclude_poll_votes(opts)
1195 |> exclude_chat_messages(opts)
1196 |> exclude_invisible_actors(opts)
1197 |> exclude_visibility(opts)
1198
1199 if Config.get([:instance, :improved_hashtag_timeline]) do
1200 query
1201 |> restrict_hashtag_any(opts)
1202 |> restrict_hashtag_all(opts)
1203 |> restrict_hashtag_reject_any(opts)
1204 else
1205 query
1206 |> restrict_tag(opts)
1207 |> restrict_tag_reject(opts)
1208 |> restrict_tag_all(opts)
1209 end
1210 end
1211
1212 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1213 list_memberships = Pleroma.List.memberships(opts[:user])
1214
1215 fetch_activities_query(recipients ++ list_memberships, opts)
1216 |> Pagination.fetch_paginated(opts, pagination)
1217 |> Enum.reverse()
1218 |> maybe_update_cc(list_memberships, opts[:user])
1219 end
1220
1221 @doc """
1222 Fetch favorites activities of user with order by sort adds to favorites
1223 """
1224 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1225 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1226 user.ap_id
1227 |> Activity.Queries.by_actor()
1228 |> Activity.Queries.by_type("Like")
1229 |> Activity.with_joined_object()
1230 |> Object.with_joined_activity()
1231 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1232 |> order_by([like, _, _], desc_nulls_last: like.id)
1233 |> Pagination.fetch_paginated(
1234 Map.merge(params, %{skip_order: true}),
1235 pagination
1236 )
1237 end
1238
1239 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1240 Enum.map(activities, fn
1241 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1242 if Enum.any?(bcc, &(&1 in list_memberships)) do
1243 update_in(activity.data["cc"], &[user_ap_id | &1])
1244 else
1245 activity
1246 end
1247
1248 activity ->
1249 activity
1250 end)
1251 end
1252
1253 defp maybe_update_cc(activities, _, _), do: activities
1254
1255 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1256 from(activity in query,
1257 where:
1258 fragment("? && ?", activity.recipients, ^recipients) or
1259 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1260 ^Constants.as_public() in activity.recipients)
1261 )
1262 end
1263
1264 def fetch_activities_bounded(
1265 recipients,
1266 recipients_with_public,
1267 opts \\ %{},
1268 pagination \\ :keyset
1269 ) do
1270 fetch_activities_query([], opts)
1271 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1272 |> Pagination.fetch_paginated(opts, pagination)
1273 |> Enum.reverse()
1274 end
1275
1276 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1277 def upload(file, opts \\ []) do
1278 with {:ok, data} <- Upload.store(file, opts) do
1279 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1280
1281 Repo.insert(%Object{data: obj_data})
1282 end
1283 end
1284
1285 @spec get_actor_url(any()) :: binary() | nil
1286 defp get_actor_url(url) when is_binary(url), do: url
1287 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1288
1289 defp get_actor_url(url) when is_list(url) do
1290 url
1291 |> List.first()
1292 |> get_actor_url()
1293 end
1294
1295 defp get_actor_url(_url), do: nil
1296
1297 defp object_to_user_data(data) do
1298 avatar =
1299 data["icon"]["url"] &&
1300 %{
1301 "type" => "Image",
1302 "url" => [%{"href" => data["icon"]["url"]}]
1303 }
1304
1305 banner =
1306 data["image"]["url"] &&
1307 %{
1308 "type" => "Image",
1309 "url" => [%{"href" => data["image"]["url"]}]
1310 }
1311
1312 fields =
1313 data
1314 |> Map.get("attachment", [])
1315 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1316 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1317
1318 emojis =
1319 data
1320 |> Map.get("tag", [])
1321 |> Enum.filter(fn
1322 %{"type" => "Emoji"} -> true
1323 _ -> false
1324 end)
1325 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1326 {String.trim(name, ":"), url}
1327 end)
1328
1329 is_locked = data["manuallyApprovesFollowers"] || false
1330 capabilities = data["capabilities"] || %{}
1331 accepts_chat_messages = capabilities["acceptsChatMessages"]
1332 data = Transmogrifier.maybe_fix_user_object(data)
1333 is_discoverable = data["discoverable"] || false
1334 invisible = data["invisible"] || false
1335 actor_type = data["type"] || "Person"
1336
1337 public_key =
1338 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1339 data["publicKey"]["publicKeyPem"]
1340 else
1341 nil
1342 end
1343
1344 shared_inbox =
1345 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1346 data["endpoints"]["sharedInbox"]
1347 else
1348 nil
1349 end
1350
1351 user_data = %{
1352 ap_id: data["id"],
1353 uri: get_actor_url(data["url"]),
1354 ap_enabled: true,
1355 banner: banner,
1356 fields: fields,
1357 emoji: emojis,
1358 is_locked: is_locked,
1359 is_discoverable: is_discoverable,
1360 invisible: invisible,
1361 avatar: avatar,
1362 name: data["name"],
1363 follower_address: data["followers"],
1364 following_address: data["following"],
1365 bio: data["summary"] || "",
1366 actor_type: actor_type,
1367 also_known_as: Map.get(data, "alsoKnownAs", []),
1368 public_key: public_key,
1369 inbox: data["inbox"],
1370 shared_inbox: shared_inbox,
1371 accepts_chat_messages: accepts_chat_messages
1372 }
1373
1374 # nickname can be nil because of virtual actors
1375 if data["preferredUsername"] do
1376 Map.put(
1377 user_data,
1378 :nickname,
1379 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1380 )
1381 else
1382 Map.put(user_data, :nickname, nil)
1383 end
1384 end
1385
1386 def fetch_follow_information_for_user(user) do
1387 with {:ok, following_data} <-
1388 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1389 {:ok, hide_follows} <- collection_private(following_data),
1390 {:ok, followers_data} <-
1391 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1392 {:ok, hide_followers} <- collection_private(followers_data) do
1393 {:ok,
1394 %{
1395 hide_follows: hide_follows,
1396 follower_count: normalize_counter(followers_data["totalItems"]),
1397 following_count: normalize_counter(following_data["totalItems"]),
1398 hide_followers: hide_followers
1399 }}
1400 else
1401 {:error, _} = e -> e
1402 e -> {:error, e}
1403 end
1404 end
1405
1406 defp normalize_counter(counter) when is_integer(counter), do: counter
1407 defp normalize_counter(_), do: 0
1408
1409 def maybe_update_follow_information(user_data) do
1410 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1411 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1412 {_, true} <-
1413 {:collections_available,
1414 !!(user_data[:following_address] && user_data[:follower_address])},
1415 {:ok, info} <-
1416 fetch_follow_information_for_user(user_data) do
1417 info = Map.merge(user_data[:info] || %{}, info)
1418
1419 user_data
1420 |> Map.put(:info, info)
1421 else
1422 {:user_type_check, false} ->
1423 user_data
1424
1425 {:collections_available, false} ->
1426 user_data
1427
1428 {:enabled, false} ->
1429 user_data
1430
1431 e ->
1432 Logger.error(
1433 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1434 )
1435
1436 user_data
1437 end
1438 end
1439
1440 defp collection_private(%{"first" => %{"type" => type}})
1441 when type in ["CollectionPage", "OrderedCollectionPage"],
1442 do: {:ok, false}
1443
1444 defp collection_private(%{"first" => first}) do
1445 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1446 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1447 {:ok, false}
1448 else
1449 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1450 {:error, _} = e -> e
1451 e -> {:error, e}
1452 end
1453 end
1454
1455 defp collection_private(_data), do: {:ok, true}
1456
1457 def user_data_from_user_object(data) do
1458 with {:ok, data} <- MRF.filter(data) do
1459 {:ok, object_to_user_data(data)}
1460 else
1461 e -> {:error, e}
1462 end
1463 end
1464
1465 def fetch_and_prepare_user_from_ap_id(ap_id) do
1466 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1467 {:ok, data} <- user_data_from_user_object(data) do
1468 {:ok, maybe_update_follow_information(data)}
1469 else
1470 # If this has been deleted, only log a debug and not an error
1471 {:error, "Object has been deleted" = e} ->
1472 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1473 {:error, e}
1474
1475 {:error, {:reject, reason} = e} ->
1476 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1477 {:error, e}
1478
1479 {:error, e} ->
1480 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1481 {:error, e}
1482 end
1483 end
1484
1485 def maybe_handle_clashing_nickname(data) do
1486 with nickname when is_binary(nickname) <- data[:nickname],
1487 %User{} = old_user <- User.get_by_nickname(nickname),
1488 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1489 Logger.info(
1490 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1491 data[:ap_id]
1492 }, renaming."
1493 )
1494
1495 old_user
1496 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1497 |> User.update_and_set_cache()
1498 else
1499 {:ap_id_comparison, true} ->
1500 Logger.info(
1501 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1502 )
1503
1504 _ ->
1505 nil
1506 end
1507 end
1508
1509 def make_user_from_ap_id(ap_id) do
1510 user = User.get_cached_by_ap_id(ap_id)
1511
1512 if user && !User.ap_enabled?(user) do
1513 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1514 else
1515 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1516 if user do
1517 user
1518 |> User.remote_user_changeset(data)
1519 |> User.update_and_set_cache()
1520 else
1521 maybe_handle_clashing_nickname(data)
1522
1523 data
1524 |> User.remote_user_changeset()
1525 |> Repo.insert()
1526 |> User.set_cache()
1527 end
1528 end
1529 end
1530 end
1531
1532 def make_user_from_nickname(nickname) do
1533 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1534 make_user_from_ap_id(ap_id)
1535 else
1536 _e -> {:error, "No AP id in WebFinger"}
1537 end
1538 end
1539
1540 # filter out broken threads
1541 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1542 entire_thread_visible_for_user?(activity, user)
1543 end
1544
1545 # do post-processing on a specific activity
1546 def contain_activity(%Activity{} = activity, %User{} = user) do
1547 contain_broken_threads(activity, user)
1548 end
1549
1550 def fetch_direct_messages_query do
1551 Activity
1552 |> restrict_type(%{type: "Create"})
1553 |> restrict_visibility(%{visibility: "direct"})
1554 |> order_by([activity], asc: activity.id)
1555 end
1556 end