0923e19e48c2488d281d80097c2a8bd4c6b7a130
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{deactivated: deactivated} -> not deactivated
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_blockers_visibility(opts)
433 |> restrict_recipients(recipients, opts[:user])
434 |> restrict_filtered(opts)
435 |> where(
436 [activity],
437 fragment(
438 "?->>'type' = ? and ?->>'context' = ?",
439 activity.data,
440 "Create",
441 activity.data,
442 ^context
443 )
444 )
445 |> exclude_poll_votes(opts)
446 |> exclude_id(opts)
447 |> order_by([activity], desc: activity.id)
448 end
449
450 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
451 def fetch_activities_for_context(context, opts \\ %{}) do
452 context
453 |> fetch_activities_for_context_query(opts)
454 |> Repo.all()
455 end
456
457 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
458 FlakeId.Ecto.CompatType.t() | nil
459 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
460 context
461 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
462 |> restrict_visibility(%{visibility: "direct"})
463 |> limit(1)
464 |> select([a], a.id)
465 |> Repo.one()
466 end
467
468 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
469 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
470 opts = Map.delete(opts, :user)
471
472 [Constants.as_public()]
473 |> fetch_activities_query(opts)
474 |> restrict_unlisted(opts)
475 |> Pagination.fetch_paginated(opts, pagination)
476 end
477
478 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
479 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
480 opts
481 |> Map.put(:restrict_unlisted, true)
482 |> fetch_public_or_unlisted_activities(pagination)
483 end
484
485 @valid_visibilities ~w[direct unlisted public private]
486
487 defp restrict_visibility(query, %{visibility: visibility})
488 when is_list(visibility) do
489 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
490 from(
491 a in query,
492 where:
493 fragment(
494 "activity_visibility(?, ?, ?) = ANY (?)",
495 a.actor,
496 a.recipients,
497 a.data,
498 ^visibility
499 )
500 )
501 else
502 Logger.error("Could not restrict visibility to #{visibility}")
503 end
504 end
505
506 defp restrict_visibility(query, %{visibility: visibility})
507 when visibility in @valid_visibilities do
508 from(
509 a in query,
510 where:
511 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
512 )
513 end
514
515 defp restrict_visibility(_query, %{visibility: visibility})
516 when visibility not in @valid_visibilities do
517 Logger.error("Could not restrict visibility to #{visibility}")
518 end
519
520 defp restrict_visibility(query, _visibility), do: query
521
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when is_list(visibility) do
524 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
525 from(
526 a in query,
527 where:
528 not fragment(
529 "activity_visibility(?, ?, ?) = ANY (?)",
530 a.actor,
531 a.recipients,
532 a.data,
533 ^visibility
534 )
535 )
536 else
537 Logger.error("Could not exclude visibility to #{visibility}")
538 query
539 end
540 end
541
542 defp exclude_visibility(query, %{exclude_visibilities: visibility})
543 when visibility in @valid_visibilities do
544 from(
545 a in query,
546 where:
547 not fragment(
548 "activity_visibility(?, ?, ?) = ?",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555 end
556
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility not in [nil | @valid_visibilities] do
559 Logger.error("Could not exclude visibility to #{visibility}")
560 query
561 end
562
563 defp exclude_visibility(query, _visibility), do: query
564
565 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
566 do: query
567
568 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
569 do: query
570
571 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
572 from(
573 a in query,
574 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
575 )
576 end
577
578 defp restrict_thread_visibility(query, _, _), do: query
579
580 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
581 params =
582 params
583 |> Map.put(:user, reading_user)
584 |> Map.put(:actor_id, user.ap_id)
585
586 %{
587 godmode: params[:godmode],
588 reading_user: reading_user
589 }
590 |> user_activities_recipients()
591 |> fetch_activities(params)
592 |> Enum.reverse()
593 end
594
595 def fetch_user_activities(user, reading_user, params \\ %{}) do
596 params =
597 params
598 |> Map.put(:type, ["Create", "Announce"])
599 |> Map.put(:user, reading_user)
600 |> Map.put(:actor_id, user.ap_id)
601 |> Map.put(:pinned_activity_ids, user.pinned_activities)
602
603 params =
604 if User.blocks?(reading_user, user) do
605 params
606 else
607 params
608 |> Map.put(:blocking_user, reading_user)
609 |> Map.put(:muting_user, reading_user)
610 end
611
612 pagination_type = Map.get(params, :pagination_type) || :keyset
613
614 %{
615 godmode: params[:godmode],
616 reading_user: reading_user
617 }
618 |> user_activities_recipients()
619 |> fetch_activities(params, pagination_type)
620 |> Enum.reverse()
621 end
622
623 def fetch_statuses(reading_user, params) do
624 params = Map.put(params, :type, ["Create", "Announce"])
625
626 %{
627 godmode: params[:godmode],
628 reading_user: reading_user
629 }
630 |> user_activities_recipients()
631 |> fetch_activities(params, :offset)
632 |> Enum.reverse()
633 end
634
635 defp user_activities_recipients(%{godmode: true}), do: []
636
637 defp user_activities_recipients(%{reading_user: reading_user}) do
638 if reading_user do
639 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
640 else
641 [Constants.as_public()]
642 end
643 end
644
645 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
646 raise "Can't use the child object without preloading!"
647 end
648
649 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
650 from(
651 [activity, object] in query,
652 where:
653 fragment(
654 "?->>'type' != ? or ?->>'actor' != ?",
655 activity.data,
656 "Announce",
657 object.data,
658 ^actor
659 )
660 )
661 end
662
663 defp restrict_announce_object_actor(query, _), do: query
664
665 defp restrict_since(query, %{since_id: ""}), do: query
666
667 defp restrict_since(query, %{since_id: since_id}) do
668 from(activity in query, where: activity.id > ^since_id)
669 end
670
671 defp restrict_since(query, _), do: query
672
673 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
674 raise "Can't use the child object without preloading!"
675 end
676
677 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
678 from(
679 [_activity, object] in query,
680 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
681 )
682 end
683
684 defp restrict_tag_reject(query, _), do: query
685
686 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
688 end
689
690 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
691 from(
692 [_activity, object] in query,
693 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
694 )
695 end
696
697 defp restrict_tag_all(query, _), do: query
698
699 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
700 raise "Can't use the child object without preloading!"
701 end
702
703 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
704 from(
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
707 )
708 end
709
710 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
711 from(
712 [_activity, object] in query,
713 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
714 )
715 end
716
717 defp restrict_tag(query, _), do: query
718
719 defp restrict_recipients(query, [], _user), do: query
720
721 defp restrict_recipients(query, recipients, nil) do
722 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
723 end
724
725 defp restrict_recipients(query, recipients, user) do
726 from(
727 activity in query,
728 where: fragment("? && ?", ^recipients, activity.recipients),
729 or_where: activity.actor == ^user.ap_id
730 )
731 end
732
733 defp restrict_local(query, %{local_only: true}) do
734 from(activity in query, where: activity.local == true)
735 end
736
737 defp restrict_local(query, _), do: query
738
739 defp restrict_actor(query, %{actor_id: actor_id}) do
740 from(activity in query, where: activity.actor == ^actor_id)
741 end
742
743 defp restrict_actor(query, _), do: query
744
745 defp restrict_type(query, %{type: type}) when is_binary(type) do
746 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
747 end
748
749 defp restrict_type(query, %{type: type}) do
750 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
751 end
752
753 defp restrict_type(query, _), do: query
754
755 defp restrict_state(query, %{state: state}) do
756 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
757 end
758
759 defp restrict_state(query, _), do: query
760
761 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
762 from(
763 [_activity, object] in query,
764 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
765 )
766 end
767
768 defp restrict_favorited_by(query, _), do: query
769
770 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
771 raise "Can't use the child object without preloading!"
772 end
773
774 defp restrict_media(query, %{only_media: true}) do
775 from(
776 [activity, object] in query,
777 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
778 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
779 )
780 end
781
782 defp restrict_media(query, _), do: query
783
784 defp restrict_replies(query, %{exclude_replies: true}) do
785 from(
786 [_activity, object] in query,
787 where: fragment("?->>'inReplyTo' is null", object.data)
788 )
789 end
790
791 defp restrict_replies(query, %{
792 reply_filtering_user: %User{} = user,
793 reply_visibility: "self"
794 }) do
795 from(
796 [activity, object] in query,
797 where:
798 fragment(
799 "?->>'inReplyTo' is null OR ? = ANY(?)",
800 object.data,
801 ^user.ap_id,
802 activity.recipients
803 )
804 )
805 end
806
807 defp restrict_replies(query, %{
808 reply_filtering_user: %User{} = user,
809 reply_visibility: "following"
810 }) do
811 from(
812 [activity, object] in query,
813 where:
814 fragment(
815 """
816 ?->>'type' != 'Create' -- This isn't a Create
817 OR ?->>'inReplyTo' is null -- this isn't a reply
818 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
819 -- unless they are the author (because authors
820 -- are also part of the recipients). This leads
821 -- to a bug that self-replies by friends won't
822 -- show up.
823 OR ? = ? -- The actor is us
824 """,
825 activity.data,
826 object.data,
827 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
828 activity.recipients,
829 activity.actor,
830 activity.actor,
831 ^user.ap_id
832 )
833 )
834 end
835
836 defp restrict_replies(query, _), do: query
837
838 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
839 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
840 end
841
842 defp restrict_reblogs(query, _), do: query
843
844 defp restrict_muted(query, %{with_muted: true}), do: query
845
846 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
847 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
848
849 query =
850 from([activity] in query,
851 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
852 where:
853 fragment(
854 "not (?->'to' \\?| ?) or ? = ?",
855 activity.data,
856 ^mutes,
857 activity.actor,
858 ^user.ap_id
859 )
860 )
861
862 unless opts[:skip_preload] do
863 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
864 else
865 query
866 end
867 end
868
869 defp restrict_muted(query, _), do: query
870
871 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
872 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
873 domain_blocks = user.domain_blocks || []
874
875 following_ap_ids = User.get_friends_ap_ids(user)
876
877 query =
878 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
879
880 from(
881 [activity, object: o] in query,
882 # You don't block the author
883 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
884
885 # You don't block any recipients, and didn't author the post
886 where:
887 fragment(
888 "((not (? && ?)) or ? = ?)",
889 activity.recipients,
890 ^blocked_ap_ids,
891 activity.actor,
892 ^user.ap_id
893 ),
894
895 # You don't block the domain of any recipients, and didn't author the post
896 where:
897 fragment(
898 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
899 activity.recipients,
900 ^domain_blocks,
901 activity.actor,
902 ^user.ap_id
903 ),
904
905 # It's not a boost of a user you block
906 where:
907 fragment(
908 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
909 activity.data,
910 activity.data,
911 ^blocked_ap_ids
912 ),
913
914 # You don't block the author's domain, and also don't follow the author
915 where:
916 fragment(
917 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
918 activity.actor,
919 ^domain_blocks,
920 activity.actor,
921 ^following_ap_ids
922 ),
923
924 # Same as above, but checks the Object
925 where:
926 fragment(
927 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
928 o.data,
929 ^domain_blocks,
930 o.data,
931 ^following_ap_ids
932 )
933 )
934 end
935
936 defp restrict_blocked(query, _), do: query
937
938 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
939 if Config.get([:activitypub, :blockers_visible]) == true do
940 query
941 else
942 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
943
944 from(
945 activity in query,
946 # The author doesn't block you
947 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
948
949 # It's not a boost of a user that blocks you
950 where:
951 fragment(
952 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
953 activity.data,
954 activity.data,
955 ^blocker_ap_ids
956 )
957 )
958 end
959 end
960
961 defp restrict_blockers_visibility(query, _), do: query
962
963 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
964 from(
965 activity in query,
966 where:
967 fragment(
968 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
969 activity.data,
970 ^[Constants.as_public()]
971 )
972 )
973 end
974
975 defp restrict_unlisted(query, _), do: query
976
977 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
978 from(activity in query, where: activity.id in ^ids)
979 end
980
981 defp restrict_pinned(query, _), do: query
982
983 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
984 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
985
986 from(
987 activity in query,
988 where:
989 fragment(
990 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
991 activity.data,
992 activity.actor,
993 ^muted_reblogs
994 )
995 )
996 end
997
998 defp restrict_muted_reblogs(query, _), do: query
999
1000 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1001 from(
1002 activity in query,
1003 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1004 )
1005 end
1006
1007 defp restrict_instance(query, _), do: query
1008
1009 defp restrict_filtered(query, %{user: %User{} = user}) do
1010 case Filter.compose_regex(user) do
1011 nil ->
1012 query
1013
1014 regex ->
1015 from([activity, object] in query,
1016 where:
1017 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1018 activity.actor == ^user.ap_id
1019 )
1020 end
1021 end
1022
1023 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1024 restrict_filtered(query, %{user: user})
1025 end
1026
1027 defp restrict_filtered(query, _), do: query
1028
1029 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1030
1031 defp exclude_poll_votes(query, _) do
1032 if has_named_binding?(query, :object) do
1033 from([activity, object: o] in query,
1034 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1035 )
1036 else
1037 query
1038 end
1039 end
1040
1041 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1042
1043 defp exclude_chat_messages(query, _) do
1044 if has_named_binding?(query, :object) do
1045 from([activity, object: o] in query,
1046 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1047 )
1048 else
1049 query
1050 end
1051 end
1052
1053 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1054
1055 defp exclude_invisible_actors(query, _opts) do
1056 invisible_ap_ids =
1057 User.Query.build(%{invisible: true, select: [:ap_id]})
1058 |> Repo.all()
1059 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1060
1061 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1062 end
1063
1064 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1065 from(activity in query, where: activity.id != ^id)
1066 end
1067
1068 defp exclude_id(query, _), do: query
1069
1070 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1071
1072 defp maybe_preload_objects(query, _) do
1073 query
1074 |> Activity.with_preloaded_object()
1075 end
1076
1077 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1078
1079 defp maybe_preload_bookmarks(query, opts) do
1080 query
1081 |> Activity.with_preloaded_bookmark(opts[:user])
1082 end
1083
1084 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1085 query
1086 |> Activity.with_preloaded_report_notes()
1087 end
1088
1089 defp maybe_preload_report_notes(query, _), do: query
1090
1091 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1092
1093 defp maybe_set_thread_muted_field(query, opts) do
1094 query
1095 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1096 end
1097
1098 defp maybe_order(query, %{order: :desc}) do
1099 query
1100 |> order_by(desc: :id)
1101 end
1102
1103 defp maybe_order(query, %{order: :asc}) do
1104 query
1105 |> order_by(asc: :id)
1106 end
1107
1108 defp maybe_order(query, _), do: query
1109
1110 defp fetch_activities_query_ap_ids_ops(opts) do
1111 source_user = opts[:muting_user]
1112 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1113
1114 ap_id_relationships =
1115 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1116 [:block | ap_id_relationships]
1117 else
1118 ap_id_relationships
1119 end
1120
1121 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1122
1123 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1124 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1125
1126 restrict_muted_reblogs_opts =
1127 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1128
1129 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1130 end
1131
1132 def fetch_activities_query(recipients, opts \\ %{}) do
1133 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1134 fetch_activities_query_ap_ids_ops(opts)
1135
1136 config = %{
1137 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1138 }
1139
1140 Activity
1141 |> maybe_preload_objects(opts)
1142 |> maybe_preload_bookmarks(opts)
1143 |> maybe_preload_report_notes(opts)
1144 |> maybe_set_thread_muted_field(opts)
1145 |> maybe_order(opts)
1146 |> restrict_recipients(recipients, opts[:user])
1147 |> restrict_replies(opts)
1148 |> restrict_tag(opts)
1149 |> restrict_tag_reject(opts)
1150 |> restrict_tag_all(opts)
1151 |> restrict_since(opts)
1152 |> restrict_local(opts)
1153 |> restrict_actor(opts)
1154 |> restrict_type(opts)
1155 |> restrict_state(opts)
1156 |> restrict_favorited_by(opts)
1157 |> restrict_blocked(restrict_blocked_opts)
1158 |> restrict_blockers_visibility(opts)
1159 |> restrict_muted(restrict_muted_opts)
1160 |> restrict_filtered(opts)
1161 |> restrict_media(opts)
1162 |> restrict_visibility(opts)
1163 |> restrict_thread_visibility(opts, config)
1164 |> restrict_reblogs(opts)
1165 |> restrict_pinned(opts)
1166 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1167 |> restrict_instance(opts)
1168 |> restrict_announce_object_actor(opts)
1169 |> restrict_filtered(opts)
1170 |> Activity.restrict_deactivated_users()
1171 |> exclude_poll_votes(opts)
1172 |> exclude_chat_messages(opts)
1173 |> exclude_invisible_actors(opts)
1174 |> exclude_visibility(opts)
1175 end
1176
1177 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1178 list_memberships = Pleroma.List.memberships(opts[:user])
1179
1180 fetch_activities_query(recipients ++ list_memberships, opts)
1181 |> Pagination.fetch_paginated(opts, pagination)
1182 |> Enum.reverse()
1183 |> maybe_update_cc(list_memberships, opts[:user])
1184 end
1185
1186 @doc """
1187 Fetch favorites activities of user with order by sort adds to favorites
1188 """
1189 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1190 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1191 user.ap_id
1192 |> Activity.Queries.by_actor()
1193 |> Activity.Queries.by_type("Like")
1194 |> Activity.with_joined_object()
1195 |> Object.with_joined_activity()
1196 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1197 |> order_by([like, _, _], desc_nulls_last: like.id)
1198 |> Pagination.fetch_paginated(
1199 Map.merge(params, %{skip_order: true}),
1200 pagination
1201 )
1202 end
1203
1204 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1205 Enum.map(activities, fn
1206 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1207 if Enum.any?(bcc, &(&1 in list_memberships)) do
1208 update_in(activity.data["cc"], &[user_ap_id | &1])
1209 else
1210 activity
1211 end
1212
1213 activity ->
1214 activity
1215 end)
1216 end
1217
1218 defp maybe_update_cc(activities, _, _), do: activities
1219
1220 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1221 from(activity in query,
1222 where:
1223 fragment("? && ?", activity.recipients, ^recipients) or
1224 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1225 ^Constants.as_public() in activity.recipients)
1226 )
1227 end
1228
1229 def fetch_activities_bounded(
1230 recipients,
1231 recipients_with_public,
1232 opts \\ %{},
1233 pagination \\ :keyset
1234 ) do
1235 fetch_activities_query([], opts)
1236 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1237 |> Pagination.fetch_paginated(opts, pagination)
1238 |> Enum.reverse()
1239 end
1240
1241 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1242 def upload(file, opts \\ []) do
1243 with {:ok, data} <- Upload.store(file, opts) do
1244 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1245
1246 Repo.insert(%Object{data: obj_data})
1247 end
1248 end
1249
1250 @spec get_actor_url(any()) :: binary() | nil
1251 defp get_actor_url(url) when is_binary(url), do: url
1252 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1253
1254 defp get_actor_url(url) when is_list(url) do
1255 url
1256 |> List.first()
1257 |> get_actor_url()
1258 end
1259
1260 defp get_actor_url(_url), do: nil
1261
1262 defp object_to_user_data(data) do
1263 avatar =
1264 data["icon"]["url"] &&
1265 %{
1266 "type" => "Image",
1267 "url" => [%{"href" => data["icon"]["url"]}]
1268 }
1269
1270 banner =
1271 data["image"]["url"] &&
1272 %{
1273 "type" => "Image",
1274 "url" => [%{"href" => data["image"]["url"]}]
1275 }
1276
1277 fields =
1278 data
1279 |> Map.get("attachment", [])
1280 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1281 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1282
1283 emojis =
1284 data
1285 |> Map.get("tag", [])
1286 |> Enum.filter(fn
1287 %{"type" => "Emoji"} -> true
1288 _ -> false
1289 end)
1290 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1291 {String.trim(name, ":"), url}
1292 end)
1293
1294 is_locked = data["manuallyApprovesFollowers"] || false
1295 capabilities = data["capabilities"] || %{}
1296 accepts_chat_messages = capabilities["acceptsChatMessages"]
1297 data = Transmogrifier.maybe_fix_user_object(data)
1298 is_discoverable = data["discoverable"] || false
1299 invisible = data["invisible"] || false
1300 actor_type = data["type"] || "Person"
1301
1302 public_key =
1303 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1304 data["publicKey"]["publicKeyPem"]
1305 else
1306 nil
1307 end
1308
1309 shared_inbox =
1310 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1311 data["endpoints"]["sharedInbox"]
1312 else
1313 nil
1314 end
1315
1316 user_data = %{
1317 ap_id: data["id"],
1318 uri: get_actor_url(data["url"]),
1319 ap_enabled: true,
1320 banner: banner,
1321 fields: fields,
1322 emoji: emojis,
1323 is_locked: is_locked,
1324 is_discoverable: is_discoverable,
1325 invisible: invisible,
1326 avatar: avatar,
1327 name: data["name"],
1328 follower_address: data["followers"],
1329 following_address: data["following"],
1330 bio: data["summary"] || "",
1331 actor_type: actor_type,
1332 also_known_as: Map.get(data, "alsoKnownAs", []),
1333 public_key: public_key,
1334 inbox: data["inbox"],
1335 shared_inbox: shared_inbox,
1336 accepts_chat_messages: accepts_chat_messages
1337 }
1338
1339 # nickname can be nil because of virtual actors
1340 if data["preferredUsername"] do
1341 Map.put(
1342 user_data,
1343 :nickname,
1344 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1345 )
1346 else
1347 Map.put(user_data, :nickname, nil)
1348 end
1349 end
1350
1351 def fetch_follow_information_for_user(user) do
1352 with {:ok, following_data} <-
1353 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1354 {:ok, hide_follows} <- collection_private(following_data),
1355 {:ok, followers_data} <-
1356 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1357 {:ok, hide_followers} <- collection_private(followers_data) do
1358 {:ok,
1359 %{
1360 hide_follows: hide_follows,
1361 follower_count: normalize_counter(followers_data["totalItems"]),
1362 following_count: normalize_counter(following_data["totalItems"]),
1363 hide_followers: hide_followers
1364 }}
1365 else
1366 {:error, _} = e -> e
1367 e -> {:error, e}
1368 end
1369 end
1370
1371 defp normalize_counter(counter) when is_integer(counter), do: counter
1372 defp normalize_counter(_), do: 0
1373
1374 def maybe_update_follow_information(user_data) do
1375 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1376 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1377 {_, true} <-
1378 {:collections_available,
1379 !!(user_data[:following_address] && user_data[:follower_address])},
1380 {:ok, info} <-
1381 fetch_follow_information_for_user(user_data) do
1382 info = Map.merge(user_data[:info] || %{}, info)
1383
1384 user_data
1385 |> Map.put(:info, info)
1386 else
1387 {:user_type_check, false} ->
1388 user_data
1389
1390 {:collections_available, false} ->
1391 user_data
1392
1393 {:enabled, false} ->
1394 user_data
1395
1396 e ->
1397 Logger.error(
1398 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1399 )
1400
1401 user_data
1402 end
1403 end
1404
1405 defp collection_private(%{"first" => %{"type" => type}})
1406 when type in ["CollectionPage", "OrderedCollectionPage"],
1407 do: {:ok, false}
1408
1409 defp collection_private(%{"first" => first}) do
1410 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1411 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1412 {:ok, false}
1413 else
1414 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1415 {:error, _} = e -> e
1416 e -> {:error, e}
1417 end
1418 end
1419
1420 defp collection_private(_data), do: {:ok, true}
1421
1422 def user_data_from_user_object(data) do
1423 with {:ok, data} <- MRF.filter(data) do
1424 {:ok, object_to_user_data(data)}
1425 else
1426 e -> {:error, e}
1427 end
1428 end
1429
1430 def fetch_and_prepare_user_from_ap_id(ap_id) do
1431 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1432 {:ok, data} <- user_data_from_user_object(data) do
1433 {:ok, maybe_update_follow_information(data)}
1434 else
1435 # If this has been deleted, only log a debug and not an error
1436 {:error, "Object has been deleted" = e} ->
1437 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1438 {:error, e}
1439
1440 {:error, {:reject, reason} = e} ->
1441 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1442 {:error, e}
1443
1444 {:error, e} ->
1445 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1446 {:error, e}
1447 end
1448 end
1449
1450 def maybe_handle_clashing_nickname(data) do
1451 with nickname when is_binary(nickname) <- data[:nickname],
1452 %User{} = old_user <- User.get_by_nickname(nickname),
1453 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1454 Logger.info(
1455 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1456 data[:ap_id]
1457 }, renaming."
1458 )
1459
1460 old_user
1461 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1462 |> User.update_and_set_cache()
1463 else
1464 {:ap_id_comparison, true} ->
1465 Logger.info(
1466 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1467 )
1468
1469 _ ->
1470 nil
1471 end
1472 end
1473
1474 def make_user_from_ap_id(ap_id) do
1475 user = User.get_cached_by_ap_id(ap_id)
1476
1477 if user && !User.ap_enabled?(user) do
1478 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1479 else
1480 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1481 if user do
1482 user
1483 |> User.remote_user_changeset(data)
1484 |> User.update_and_set_cache()
1485 else
1486 maybe_handle_clashing_nickname(data)
1487
1488 data
1489 |> User.remote_user_changeset()
1490 |> Repo.insert()
1491 |> User.set_cache()
1492 end
1493 end
1494 end
1495 end
1496
1497 def make_user_from_nickname(nickname) do
1498 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1499 make_user_from_ap_id(ap_id)
1500 else
1501 _e -> {:error, "No AP id in WebFinger"}
1502 end
1503 end
1504
1505 # filter out broken threads
1506 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1507 entire_thread_visible_for_user?(activity, user)
1508 end
1509
1510 # do post-processing on a specific activity
1511 def contain_activity(%Activity{} = activity, %User{} = user) do
1512 contain_broken_threads(activity, user)
1513 end
1514
1515 def fetch_direct_messages_query do
1516 Activity
1517 |> restrict_type(%{type: "Create"})
1518 |> restrict_visibility(%{visibility: "direct"})
1519 |> order_by([activity], asc: activity.id)
1520 end
1521 end