Merge remote-tracking branch 'remotes/origin/develop' into feature/object-hashtags...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36
37 defp get_recipients(%{"type" => "Create"} = data) do
38 to = Map.get(data, "to", [])
39 cc = Map.get(data, "cc", [])
40 bcc = Map.get(data, "bcc", [])
41 actor = Map.get(data, "actor", [])
42 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
43 {recipients, to, cc}
44 end
45
46 defp get_recipients(data) do
47 to = Map.get(data, "to", [])
48 cc = Map.get(data, "cc", [])
49 bcc = Map.get(data, "bcc", [])
50 recipients = Enum.concat([to, cc, bcc])
51 {recipients, to, cc}
52 end
53
54 defp check_actor_is_active(nil), do: true
55
56 defp check_actor_is_active(actor) when is_binary(actor) do
57 case User.get_cached_by_ap_id(actor) do
58 %User{deactivated: deactivated} -> not deactivated
59 _ -> false
60 end
61 end
62
63 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
64 limit = Config.get([:instance, :remote_limit])
65 String.length(content) <= limit
66 end
67
68 defp check_remote_limit(_), do: true
69
70 def increase_note_count_if_public(actor, object) do
71 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
72 end
73
74 def decrease_note_count_if_public(actor, object) do
75 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
76 end
77
78 defp increase_replies_count_if_reply(%{
79 "object" => %{"inReplyTo" => reply_ap_id} = object,
80 "type" => "Create"
81 }) do
82 if is_public?(object) do
83 Object.increase_replies_count(reply_ap_id)
84 end
85 end
86
87 defp increase_replies_count_if_reply(_create_data), do: :noop
88
89 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
90 @impl true
91 def persist(%{"type" => type} = object, meta) when type in @object_types do
92 with {:ok, object} <- Object.create(object) do
93 {:ok, object, meta}
94 end
95 end
96
97 @impl true
98 def persist(object, meta) do
99 with local <- Keyword.fetch!(meta, :local),
100 {recipients, _, _} <- get_recipients(object),
101 {:ok, activity} <-
102 Repo.insert(%Activity{
103 data: object,
104 local: local,
105 recipients: recipients,
106 actor: object["actor"]
107 }),
108 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
109 {:ok, _} <- maybe_create_activity_expiration(activity) do
110 {:ok, activity, meta}
111 end
112 end
113
114 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
115 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
119 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
124 {:ok, map, object} <- insert_full_object(map),
125 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
126 # Splice in the child object if we have one.
127 activity = Maps.put_if_present(activity, :object, object)
128
129 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
130 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
131 end)
132
133 {:ok, activity}
134 else
135 %Activity{} = activity ->
136 {:ok, activity}
137
138 {:actor_check, _} ->
139 {:error, false}
140
141 {:containment, _} = error ->
142 error
143
144 {:error, _} = error ->
145 error
146
147 {:fake, true, map, recipients} ->
148 activity = %Activity{
149 data: map,
150 local: local,
151 actor: map["actor"],
152 recipients: recipients,
153 id: "pleroma:fakeid"
154 }
155
156 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
157 {:ok, activity}
158
159 {:remote_limit_pass, _} ->
160 {:error, :remote_limit}
161
162 {:reject, _} = e ->
163 {:error, e}
164 end
165 end
166
167 defp insert_activity_with_expiration(data, local, recipients) do
168 struct = %Activity{
169 data: data,
170 local: local,
171 actor: data["actor"],
172 recipients: recipients
173 }
174
175 with {:ok, activity} <- Repo.insert(struct) do
176 maybe_create_activity_expiration(activity)
177 end
178 end
179
180 def notify_and_stream(activity) do
181 Notification.create_notifications(activity)
182
183 conversation = create_or_bump_conversation(activity, activity.actor)
184 participations = get_participations(conversation)
185 stream_out(activity)
186 stream_out_participations(participations)
187 end
188
189 defp maybe_create_activity_expiration(
190 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
191 ) do
192 with {:ok, _job} <-
193 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
194 activity_id: activity.id,
195 expires_at: expires_at
196 }) do
197 {:ok, activity}
198 end
199 end
200
201 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
202
203 defp create_or_bump_conversation(activity, actor) do
204 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
205 %User{} = user <- User.get_cached_by_ap_id(actor) do
206 Participation.mark_as_read(user, conversation)
207 {:ok, conversation}
208 end
209 end
210
211 defp get_participations({:ok, conversation}) do
212 conversation
213 |> Repo.preload(:participations, force: true)
214 |> Map.get(:participations)
215 end
216
217 defp get_participations(_), do: []
218
219 def stream_out_participations(participations) do
220 participations =
221 participations
222 |> Repo.preload(:user)
223
224 Streamer.stream("participation", participations)
225 end
226
227 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
228 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
229 conversation = Repo.preload(conversation, :participations)
230
231 last_activity_id =
232 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
233 user: user,
234 blocking_user: user
235 })
236
237 if last_activity_id do
238 stream_out_participations(conversation.participations)
239 end
240 end
241 end
242
243 def stream_out_participations(_, _), do: :noop
244
245 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
246 when data_type in ["Create", "Announce", "Delete"] do
247 activity
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(activity)
250 end
251
252 def stream_out(_activity) do
253 :noop
254 end
255
256 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
257 def create(params, fake \\ false) do
258 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
259 result
260 end
261 end
262
263 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
264 additional = params[:additional] || %{}
265 # only accept false as false value
266 local = !(params[:local] == false)
267 published = params[:published]
268 quick_insert? = Config.get([:env]) == :benchmark
269
270 create_data =
271 make_create_data(
272 %{to: to, actor: actor, published: published, context: context, object: object},
273 additional
274 )
275
276 with {:ok, activity} <- insert(create_data, local, fake),
277 {:fake, false, activity} <- {:fake, fake, activity},
278 _ <- increase_replies_count_if_reply(create_data),
279 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
280 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
281 _ <- notify_and_stream(activity),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 else
285 {:quick_insert, true, activity} ->
286 {:ok, activity}
287
288 {:fake, true, activity} ->
289 {:ok, activity}
290
291 {:error, message} ->
292 Repo.rollback(message)
293 end
294 end
295
296 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
297 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
298 additional = params[:additional] || %{}
299 # only accept false as false value
300 local = !(params[:local] == false)
301 published = params[:published]
302
303 listen_data =
304 make_listen_data(
305 %{to: to, actor: actor, published: published, context: context, object: object},
306 additional
307 )
308
309 with {:ok, activity} <- insert(listen_data, local),
310 _ <- notify_and_stream(activity),
311 :ok <- maybe_federate(activity) do
312 {:ok, activity}
313 end
314 end
315
316 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
317 {:ok, Activity.t()} | nil | {:error, any()}
318 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
319 with {:ok, result} <-
320 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
321 result
322 end
323 end
324
325 defp do_unfollow(follower, followed, activity_id, local) do
326 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
327 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
328 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
329 {:ok, activity} <- insert(unfollow_data, local),
330 _ <- notify_and_stream(activity),
331 :ok <- maybe_federate(activity) do
332 {:ok, activity}
333 else
334 nil -> nil
335 {:error, error} -> Repo.rollback(error)
336 end
337 end
338
339 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
340 def flag(params) do
341 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
342 result
343 end
344 end
345
346 defp do_flag(
347 %{
348 actor: actor,
349 context: _context,
350 account: account,
351 statuses: statuses,
352 content: content
353 } = params
354 ) do
355 # only accept false as false value
356 local = !(params[:local] == false)
357 forward = !(params[:forward] == false)
358
359 additional = params[:additional] || %{}
360
361 additional =
362 if forward do
363 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
364 else
365 Map.merge(additional, %{"to" => [], "cc" => []})
366 end
367
368 with flag_data <- make_flag_data(params, additional),
369 {:ok, activity} <- insert(flag_data, local),
370 {:ok, stripped_activity} <- strip_report_status_data(activity),
371 _ <- notify_and_stream(activity),
372 :ok <-
373 maybe_federate(stripped_activity) do
374 User.all_superusers()
375 |> Enum.filter(fn user -> not is_nil(user.email) end)
376 |> Enum.each(fn superuser ->
377 superuser
378 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
379 |> Pleroma.Emails.Mailer.deliver_async()
380 end)
381
382 {:ok, activity}
383 else
384 {:error, error} -> Repo.rollback(error)
385 end
386 end
387
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
390 params = %{
391 "type" => "Move",
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
395 }
396
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
401
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
405 })
406
407 {:ok, activity}
408 else
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
410 err -> err
411 end
412 end
413
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
416
417 recipients =
418 if opts[:user],
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
420 else: public
421
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
429 |> where(
430 [activity],
431 fragment(
432 "?->>'type' = ? and ?->>'context' = ?",
433 activity.data,
434 "Create",
435 activity.data,
436 ^context
437 )
438 )
439 |> exclude_poll_votes(opts)
440 |> exclude_id(opts)
441 |> order_by([activity], desc: activity.id)
442 end
443
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
446 context
447 |> fetch_activities_for_context_query(opts)
448 |> Repo.all()
449 end
450
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
457 |> limit(1)
458 |> select([a], a.id)
459 |> Repo.one()
460 end
461
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
465
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
470 end
471
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
474 opts
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
477 end
478
479 @valid_visibilities ~w[direct unlisted public private]
480
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
484 from(
485 a in query,
486 where:
487 fragment(
488 "activity_visibility(?, ?, ?) = ANY (?)",
489 a.actor,
490 a.recipients,
491 a.data,
492 ^visibility
493 )
494 )
495 else
496 Logger.error("Could not restrict visibility to #{visibility}")
497 end
498 end
499
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
502 from(
503 a in query,
504 where:
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 )
507 end
508
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
512 end
513
514 defp restrict_visibility(query, _visibility), do: query
515
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
519 from(
520 a in query,
521 where:
522 not fragment(
523 "activity_visibility(?, ?, ?) = ANY (?)",
524 a.actor,
525 a.recipients,
526 a.data,
527 ^visibility
528 )
529 )
530 else
531 Logger.error("Could not exclude visibility to #{visibility}")
532 query
533 end
534 end
535
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
538 from(
539 a in query,
540 where:
541 not fragment(
542 "activity_visibility(?, ?, ?) = ?",
543 a.actor,
544 a.recipients,
545 a.data,
546 ^visibility
547 )
548 )
549 end
550
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
554 query
555 end
556
557 defp exclude_visibility(query, _visibility), do: query
558
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
563 do: query
564
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
566 from(
567 a in query,
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 )
570 end
571
572 defp restrict_thread_visibility(query, _, _), do: query
573
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
575 params =
576 params
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579
580 %{
581 godmode: params[:godmode],
582 reading_user: reading_user
583 }
584 |> user_activities_recipients()
585 |> fetch_activities(params)
586 |> Enum.reverse()
587 end
588
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
590 params =
591 params
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
596
597 params =
598 if User.blocks?(reading_user, user) do
599 params
600 else
601 params
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
604 end
605
606 %{
607 godmode: params[:godmode],
608 reading_user: reading_user
609 }
610 |> user_activities_recipients()
611 |> fetch_activities(params)
612 |> Enum.reverse()
613 end
614
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
617
618 %{
619 godmode: params[:godmode],
620 reading_user: reading_user
621 }
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
624 |> Enum.reverse()
625 end
626
627 defp user_activities_recipients(%{godmode: true}), do: []
628
629 defp user_activities_recipients(%{reading_user: reading_user}) do
630 if reading_user do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
632 else
633 [Constants.as_public()]
634 end
635 end
636
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
639 end
640
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
642 from(
643 [activity, object] in query,
644 where:
645 fragment(
646 "?->>'type' != ? or ?->>'actor' != ?",
647 activity.data,
648 "Announce",
649 object.data,
650 ^actor
651 )
652 )
653 end
654
655 defp restrict_announce_object_actor(query, _), do: query
656
657 defp restrict_since(query, %{since_id: ""}), do: query
658
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
661 end
662
663 defp restrict_since(query, _), do: query
664
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise_on_missing_preload()
667 end
668
669 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
670 from(
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 )
674 end
675
676 defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
677 restrict_tag_reject(query, %{tag_reject: [tag_reject]})
678 end
679
680 defp restrict_tag_reject(query, _), do: query
681
682 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
683 raise_on_missing_preload()
684 end
685
686 defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
687 from(
688 [_activity, object] in query,
689 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
690 )
691 end
692
693 defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
694 restrict_tag(query, %{tag: tag})
695 end
696
697 defp restrict_tag_all(query, _), do: query
698
699 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
700 raise_on_missing_preload()
701 end
702
703 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
704 from(
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
707 )
708 end
709
710 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
711 restrict_tag(query, %{tag: [tag]})
712 end
713
714 defp restrict_tag(query, _), do: query
715
716 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
717 raise_on_missing_preload()
718 end
719
720 defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
721 if has_named_binding?(query, :thread_mute) do
722 from(
723 [activity, object, thread_mute] in query,
724 group_by: [activity.id, object.id, thread_mute.id]
725 )
726 else
727 from(
728 [activity, object] in query,
729 group_by: [activity.id, object.id]
730 )
731 end
732 |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
733 |> having(
734 [hashtag: hashtag],
735 fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
736 )
737 end
738
739 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
740 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
741 end
742
743 defp restrict_hashtag_reject_any(query, _), do: query
744
745 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
746 raise_on_missing_preload()
747 end
748
749 defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
750 Enum.reduce(
751 tags,
752 query,
753 fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
754 )
755 end
756
757 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
758 restrict_hashtag_any(query, %{tag: tag})
759 end
760
761 defp restrict_hashtag_all(query, _), do: query
762
763 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
764 raise_on_missing_preload()
765 end
766
767 defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
768 from(
769 [_activity, object] in query,
770 join: hashtag in assoc(object, :hashtags),
771 where: hashtag.name in ^tags
772 )
773 end
774
775 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
776 restrict_hashtag_any(query, %{tag: [tag]})
777 end
778
779 defp restrict_hashtag_any(query, _), do: query
780
781 defp raise_on_missing_preload do
782 raise "Can't use the child object without preloading!"
783 end
784
785 defp restrict_recipients(query, [], _user), do: query
786
787 defp restrict_recipients(query, recipients, nil) do
788 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
789 end
790
791 defp restrict_recipients(query, recipients, user) do
792 from(
793 activity in query,
794 where: fragment("? && ?", ^recipients, activity.recipients),
795 or_where: activity.actor == ^user.ap_id
796 )
797 end
798
799 defp restrict_local(query, %{local_only: true}) do
800 from(activity in query, where: activity.local == true)
801 end
802
803 defp restrict_local(query, _), do: query
804
805 defp restrict_actor(query, %{actor_id: actor_id}) do
806 from(activity in query, where: activity.actor == ^actor_id)
807 end
808
809 defp restrict_actor(query, _), do: query
810
811 defp restrict_type(query, %{type: type}) when is_binary(type) do
812 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
813 end
814
815 defp restrict_type(query, %{type: type}) do
816 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
817 end
818
819 defp restrict_type(query, _), do: query
820
821 defp restrict_state(query, %{state: state}) do
822 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
823 end
824
825 defp restrict_state(query, _), do: query
826
827 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
828 from(
829 [_activity, object] in query,
830 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
831 )
832 end
833
834 defp restrict_favorited_by(query, _), do: query
835
836 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
837 raise "Can't use the child object without preloading!"
838 end
839
840 defp restrict_media(query, %{only_media: true}) do
841 from(
842 [activity, object] in query,
843 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
844 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
845 )
846 end
847
848 defp restrict_media(query, _), do: query
849
850 defp restrict_replies(query, %{exclude_replies: true}) do
851 from(
852 [_activity, object] in query,
853 where: fragment("?->>'inReplyTo' is null", object.data)
854 )
855 end
856
857 defp restrict_replies(query, %{
858 reply_filtering_user: %User{} = user,
859 reply_visibility: "self"
860 }) do
861 from(
862 [activity, object] in query,
863 where:
864 fragment(
865 "?->>'inReplyTo' is null OR ? = ANY(?)",
866 object.data,
867 ^user.ap_id,
868 activity.recipients
869 )
870 )
871 end
872
873 defp restrict_replies(query, %{
874 reply_filtering_user: %User{} = user,
875 reply_visibility: "following"
876 }) do
877 from(
878 [activity, object] in query,
879 where:
880 fragment(
881 """
882 ?->>'type' != 'Create' -- This isn't a Create
883 OR ?->>'inReplyTo' is null -- this isn't a reply
884 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
885 -- unless they are the author (because authors
886 -- are also part of the recipients). This leads
887 -- to a bug that self-replies by friends won't
888 -- show up.
889 OR ? = ? -- The actor is us
890 """,
891 activity.data,
892 object.data,
893 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
894 activity.recipients,
895 activity.actor,
896 activity.actor,
897 ^user.ap_id
898 )
899 )
900 end
901
902 defp restrict_replies(query, _), do: query
903
904 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
905 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
906 end
907
908 defp restrict_reblogs(query, _), do: query
909
910 defp restrict_muted(query, %{with_muted: true}), do: query
911
912 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
913 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
914
915 query =
916 from([activity] in query,
917 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
918 where:
919 fragment(
920 "not (?->'to' \\?| ?) or ? = ?",
921 activity.data,
922 ^mutes,
923 activity.actor,
924 ^user.ap_id
925 )
926 )
927
928 unless opts[:skip_preload] do
929 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
930 else
931 query
932 end
933 end
934
935 defp restrict_muted(query, _), do: query
936
937 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
938 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
939 domain_blocks = user.domain_blocks || []
940
941 following_ap_ids = User.get_friends_ap_ids(user)
942
943 query =
944 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
945
946 from(
947 [activity, object: o] in query,
948 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
949 where:
950 fragment(
951 "((not (? && ?)) or ? = ?)",
952 activity.recipients,
953 ^blocked_ap_ids,
954 activity.actor,
955 ^user.ap_id
956 ),
957 where:
958 fragment(
959 "recipients_contain_blocked_domains(?, ?) = false",
960 activity.recipients,
961 ^domain_blocks
962 ),
963 where:
964 fragment(
965 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
966 activity.data,
967 activity.data,
968 ^blocked_ap_ids
969 ),
970 where:
971 fragment(
972 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
973 activity.actor,
974 ^domain_blocks,
975 activity.actor,
976 ^following_ap_ids
977 ),
978 where:
979 fragment(
980 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
981 o.data,
982 ^domain_blocks,
983 o.data,
984 ^following_ap_ids
985 )
986 )
987 end
988
989 defp restrict_blocked(query, _), do: query
990
991 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
992 from(
993 activity in query,
994 where:
995 fragment(
996 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
997 activity.data,
998 ^[Constants.as_public()]
999 )
1000 )
1001 end
1002
1003 defp restrict_unlisted(query, _), do: query
1004
1005 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
1006 from(activity in query, where: activity.id in ^ids)
1007 end
1008
1009 defp restrict_pinned(query, _), do: query
1010
1011 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1012 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1013
1014 from(
1015 activity in query,
1016 where:
1017 fragment(
1018 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1019 activity.data,
1020 activity.actor,
1021 ^muted_reblogs
1022 )
1023 )
1024 end
1025
1026 defp restrict_muted_reblogs(query, _), do: query
1027
1028 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1029 from(
1030 activity in query,
1031 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1032 )
1033 end
1034
1035 defp restrict_instance(query, _), do: query
1036
1037 defp restrict_filtered(query, %{user: %User{} = user}) do
1038 case Filter.compose_regex(user) do
1039 nil ->
1040 query
1041
1042 regex ->
1043 from([activity, object] in query,
1044 where:
1045 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1046 activity.actor == ^user.ap_id
1047 )
1048 end
1049 end
1050
1051 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1052 restrict_filtered(query, %{user: user})
1053 end
1054
1055 defp restrict_filtered(query, _), do: query
1056
1057 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1058
1059 defp exclude_poll_votes(query, _) do
1060 if has_named_binding?(query, :object) do
1061 from([activity, object: o] in query,
1062 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1063 )
1064 else
1065 query
1066 end
1067 end
1068
1069 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1070
1071 defp exclude_chat_messages(query, _) do
1072 if has_named_binding?(query, :object) do
1073 from([activity, object: o] in query,
1074 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1075 )
1076 else
1077 query
1078 end
1079 end
1080
1081 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1082
1083 defp exclude_invisible_actors(query, _opts) do
1084 invisible_ap_ids =
1085 User.Query.build(%{invisible: true, select: [:ap_id]})
1086 |> Repo.all()
1087 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1088
1089 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1090 end
1091
1092 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1093 from(activity in query, where: activity.id != ^id)
1094 end
1095
1096 defp exclude_id(query, _), do: query
1097
1098 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1099
1100 defp maybe_preload_objects(query, _) do
1101 query
1102 |> Activity.with_preloaded_object()
1103 end
1104
1105 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1106
1107 defp maybe_preload_bookmarks(query, opts) do
1108 query
1109 |> Activity.with_preloaded_bookmark(opts[:user])
1110 end
1111
1112 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1113 query
1114 |> Activity.with_preloaded_report_notes()
1115 end
1116
1117 defp maybe_preload_report_notes(query, _), do: query
1118
1119 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1120
1121 defp maybe_set_thread_muted_field(query, opts) do
1122 query
1123 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1124 end
1125
1126 defp maybe_order(query, %{order: :desc}) do
1127 query
1128 |> order_by(desc: :id)
1129 end
1130
1131 defp maybe_order(query, %{order: :asc}) do
1132 query
1133 |> order_by(asc: :id)
1134 end
1135
1136 defp maybe_order(query, _), do: query
1137
1138 defp fetch_activities_query_ap_ids_ops(opts) do
1139 source_user = opts[:muting_user]
1140 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1141
1142 ap_id_relationships =
1143 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1144 [:block | ap_id_relationships]
1145 else
1146 ap_id_relationships
1147 end
1148
1149 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1150
1151 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1152 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1153
1154 restrict_muted_reblogs_opts =
1155 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1156
1157 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1158 end
1159
1160 def fetch_activities_query(recipients, opts \\ %{}) do
1161 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1162 fetch_activities_query_ap_ids_ops(opts)
1163
1164 config = %{
1165 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1166 }
1167
1168 query =
1169 Activity
1170 |> distinct([a], true)
1171 |> maybe_preload_objects(opts)
1172 |> maybe_preload_bookmarks(opts)
1173 |> maybe_preload_report_notes(opts)
1174 |> maybe_set_thread_muted_field(opts)
1175 |> maybe_order(opts)
1176 |> restrict_recipients(recipients, opts[:user])
1177 |> restrict_replies(opts)
1178 |> restrict_since(opts)
1179 |> restrict_local(opts)
1180 |> restrict_actor(opts)
1181 |> restrict_type(opts)
1182 |> restrict_state(opts)
1183 |> restrict_favorited_by(opts)
1184 |> restrict_blocked(restrict_blocked_opts)
1185 |> restrict_muted(restrict_muted_opts)
1186 |> restrict_filtered(opts)
1187 |> restrict_media(opts)
1188 |> restrict_visibility(opts)
1189 |> restrict_thread_visibility(opts, config)
1190 |> restrict_reblogs(opts)
1191 |> restrict_pinned(opts)
1192 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1193 |> restrict_instance(opts)
1194 |> restrict_announce_object_actor(opts)
1195 |> restrict_filtered(opts)
1196 |> Activity.restrict_deactivated_users()
1197 |> exclude_poll_votes(opts)
1198 |> exclude_chat_messages(opts)
1199 |> exclude_invisible_actors(opts)
1200 |> exclude_visibility(opts)
1201
1202 if Config.get([:instance, :improved_hashtag_timeline]) do
1203 query
1204 |> restrict_hashtag_any(opts)
1205 |> restrict_hashtag_all(opts)
1206 |> restrict_hashtag_reject_any(opts)
1207 else
1208 query
1209 |> restrict_tag(opts)
1210 |> restrict_tag_reject(opts)
1211 |> restrict_tag_all(opts)
1212 end
1213 end
1214
1215 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1216 list_memberships = Pleroma.List.memberships(opts[:user])
1217
1218 fetch_activities_query(recipients ++ list_memberships, opts)
1219 |> Pagination.fetch_paginated(opts, pagination)
1220 |> Enum.reverse()
1221 |> maybe_update_cc(list_memberships, opts[:user])
1222 end
1223
1224 @doc """
1225 Fetch favorites activities of user with order by sort adds to favorites
1226 """
1227 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1228 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1229 user.ap_id
1230 |> Activity.Queries.by_actor()
1231 |> Activity.Queries.by_type("Like")
1232 |> Activity.with_joined_object()
1233 |> Object.with_joined_activity()
1234 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1235 |> order_by([like, _, _], desc_nulls_last: like.id)
1236 |> Pagination.fetch_paginated(
1237 Map.merge(params, %{skip_order: true}),
1238 pagination
1239 )
1240 end
1241
1242 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1243 Enum.map(activities, fn
1244 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1245 if Enum.any?(bcc, &(&1 in list_memberships)) do
1246 update_in(activity.data["cc"], &[user_ap_id | &1])
1247 else
1248 activity
1249 end
1250
1251 activity ->
1252 activity
1253 end)
1254 end
1255
1256 defp maybe_update_cc(activities, _, _), do: activities
1257
1258 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1259 from(activity in query,
1260 where:
1261 fragment("? && ?", activity.recipients, ^recipients) or
1262 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1263 ^Constants.as_public() in activity.recipients)
1264 )
1265 end
1266
1267 def fetch_activities_bounded(
1268 recipients,
1269 recipients_with_public,
1270 opts \\ %{},
1271 pagination \\ :keyset
1272 ) do
1273 fetch_activities_query([], opts)
1274 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1275 |> Pagination.fetch_paginated(opts, pagination)
1276 |> Enum.reverse()
1277 end
1278
1279 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1280 def upload(file, opts \\ []) do
1281 with {:ok, data} <- Upload.store(file, opts) do
1282 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1283
1284 Repo.insert(%Object{data: obj_data})
1285 end
1286 end
1287
1288 @spec get_actor_url(any()) :: binary() | nil
1289 defp get_actor_url(url) when is_binary(url), do: url
1290 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1291
1292 defp get_actor_url(url) when is_list(url) do
1293 url
1294 |> List.first()
1295 |> get_actor_url()
1296 end
1297
1298 defp get_actor_url(_url), do: nil
1299
1300 defp object_to_user_data(data) do
1301 avatar =
1302 data["icon"]["url"] &&
1303 %{
1304 "type" => "Image",
1305 "url" => [%{"href" => data["icon"]["url"]}]
1306 }
1307
1308 banner =
1309 data["image"]["url"] &&
1310 %{
1311 "type" => "Image",
1312 "url" => [%{"href" => data["image"]["url"]}]
1313 }
1314
1315 fields =
1316 data
1317 |> Map.get("attachment", [])
1318 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1319 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1320
1321 emojis =
1322 data
1323 |> Map.get("tag", [])
1324 |> Enum.filter(fn
1325 %{"type" => "Emoji"} -> true
1326 _ -> false
1327 end)
1328 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1329 {String.trim(name, ":"), url}
1330 end)
1331
1332 is_locked = data["manuallyApprovesFollowers"] || false
1333 capabilities = data["capabilities"] || %{}
1334 accepts_chat_messages = capabilities["acceptsChatMessages"]
1335 data = Transmogrifier.maybe_fix_user_object(data)
1336 is_discoverable = data["discoverable"] || false
1337 invisible = data["invisible"] || false
1338 actor_type = data["type"] || "Person"
1339
1340 public_key =
1341 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1342 data["publicKey"]["publicKeyPem"]
1343 else
1344 nil
1345 end
1346
1347 shared_inbox =
1348 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1349 data["endpoints"]["sharedInbox"]
1350 else
1351 nil
1352 end
1353
1354 user_data = %{
1355 ap_id: data["id"],
1356 uri: get_actor_url(data["url"]),
1357 ap_enabled: true,
1358 banner: banner,
1359 fields: fields,
1360 emoji: emojis,
1361 is_locked: is_locked,
1362 is_discoverable: is_discoverable,
1363 invisible: invisible,
1364 avatar: avatar,
1365 name: data["name"],
1366 follower_address: data["followers"],
1367 following_address: data["following"],
1368 bio: data["summary"] || "",
1369 actor_type: actor_type,
1370 also_known_as: Map.get(data, "alsoKnownAs", []),
1371 public_key: public_key,
1372 inbox: data["inbox"],
1373 shared_inbox: shared_inbox,
1374 accepts_chat_messages: accepts_chat_messages
1375 }
1376
1377 # nickname can be nil because of virtual actors
1378 if data["preferredUsername"] do
1379 Map.put(
1380 user_data,
1381 :nickname,
1382 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1383 )
1384 else
1385 Map.put(user_data, :nickname, nil)
1386 end
1387 end
1388
1389 def fetch_follow_information_for_user(user) do
1390 with {:ok, following_data} <-
1391 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1392 {:ok, hide_follows} <- collection_private(following_data),
1393 {:ok, followers_data} <-
1394 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1395 {:ok, hide_followers} <- collection_private(followers_data) do
1396 {:ok,
1397 %{
1398 hide_follows: hide_follows,
1399 follower_count: normalize_counter(followers_data["totalItems"]),
1400 following_count: normalize_counter(following_data["totalItems"]),
1401 hide_followers: hide_followers
1402 }}
1403 else
1404 {:error, _} = e -> e
1405 e -> {:error, e}
1406 end
1407 end
1408
1409 defp normalize_counter(counter) when is_integer(counter), do: counter
1410 defp normalize_counter(_), do: 0
1411
1412 def maybe_update_follow_information(user_data) do
1413 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1414 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1415 {_, true} <-
1416 {:collections_available,
1417 !!(user_data[:following_address] && user_data[:follower_address])},
1418 {:ok, info} <-
1419 fetch_follow_information_for_user(user_data) do
1420 info = Map.merge(user_data[:info] || %{}, info)
1421
1422 user_data
1423 |> Map.put(:info, info)
1424 else
1425 {:user_type_check, false} ->
1426 user_data
1427
1428 {:collections_available, false} ->
1429 user_data
1430
1431 {:enabled, false} ->
1432 user_data
1433
1434 e ->
1435 Logger.error(
1436 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1437 )
1438
1439 user_data
1440 end
1441 end
1442
1443 defp collection_private(%{"first" => %{"type" => type}})
1444 when type in ["CollectionPage", "OrderedCollectionPage"],
1445 do: {:ok, false}
1446
1447 defp collection_private(%{"first" => first}) do
1448 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1449 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1450 {:ok, false}
1451 else
1452 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1453 {:error, _} = e -> e
1454 e -> {:error, e}
1455 end
1456 end
1457
1458 defp collection_private(_data), do: {:ok, true}
1459
1460 def user_data_from_user_object(data) do
1461 with {:ok, data} <- MRF.filter(data) do
1462 {:ok, object_to_user_data(data)}
1463 else
1464 e -> {:error, e}
1465 end
1466 end
1467
1468 def fetch_and_prepare_user_from_ap_id(ap_id) do
1469 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1470 {:ok, data} <- user_data_from_user_object(data) do
1471 {:ok, maybe_update_follow_information(data)}
1472 else
1473 # If this has been deleted, only log a debug and not an error
1474 {:error, "Object has been deleted" = e} ->
1475 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1476 {:error, e}
1477
1478 {:error, {:reject, reason} = e} ->
1479 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1480 {:error, e}
1481
1482 {:error, e} ->
1483 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1484 {:error, e}
1485 end
1486 end
1487
1488 def maybe_handle_clashing_nickname(data) do
1489 with nickname when is_binary(nickname) <- data[:nickname],
1490 %User{} = old_user <- User.get_by_nickname(nickname),
1491 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1492 Logger.info(
1493 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1494 data[:ap_id]
1495 }, renaming."
1496 )
1497
1498 old_user
1499 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1500 |> User.update_and_set_cache()
1501 else
1502 {:ap_id_comparison, true} ->
1503 Logger.info(
1504 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1505 )
1506
1507 _ ->
1508 nil
1509 end
1510 end
1511
1512 def make_user_from_ap_id(ap_id) do
1513 user = User.get_cached_by_ap_id(ap_id)
1514
1515 if user && !User.ap_enabled?(user) do
1516 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1517 else
1518 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1519 if user do
1520 user
1521 |> User.remote_user_changeset(data)
1522 |> User.update_and_set_cache()
1523 else
1524 maybe_handle_clashing_nickname(data)
1525
1526 data
1527 |> User.remote_user_changeset()
1528 |> Repo.insert()
1529 |> User.set_cache()
1530 end
1531 end
1532 end
1533 end
1534
1535 def make_user_from_nickname(nickname) do
1536 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1537 make_user_from_ap_id(ap_id)
1538 else
1539 _e -> {:error, "No AP id in WebFinger"}
1540 end
1541 end
1542
1543 # filter out broken threads
1544 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1545 entire_thread_visible_for_user?(activity, user)
1546 end
1547
1548 # do post-processing on a specific activity
1549 def contain_activity(%Activity{} = activity, %User{} = user) do
1550 contain_broken_threads(activity, user)
1551 end
1552
1553 def fetch_direct_messages_query do
1554 Activity
1555 |> restrict_type(%{type: "Create"})
1556 |> restrict_visibility(%{visibility: "direct"})
1557 |> order_by([activity], asc: activity.id)
1558 end
1559 end