added total
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
36 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
37
38 defp get_recipients(%{"type" => "Create"} = data) do
39 to = Map.get(data, "to", [])
40 cc = Map.get(data, "cc", [])
41 bcc = Map.get(data, "bcc", [])
42 actor = Map.get(data, "actor", [])
43 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
44 {recipients, to, cc}
45 end
46
47 defp get_recipients(data) do
48 to = Map.get(data, "to", [])
49 cc = Map.get(data, "cc", [])
50 bcc = Map.get(data, "bcc", [])
51 recipients = Enum.concat([to, cc, bcc])
52 {recipients, to, cc}
53 end
54
55 defp check_actor_is_active(nil), do: true
56
57 defp check_actor_is_active(actor) when is_binary(actor) do
58 case User.get_cached_by_ap_id(actor) do
59 %User{is_active: true} -> true
60 _ -> false
61 end
62 end
63
64 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
65 limit = Config.get([:instance, :remote_limit])
66 String.length(content) <= limit
67 end
68
69 defp check_remote_limit(_), do: true
70
71 def increase_note_count_if_public(actor, object) do
72 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
73 end
74
75 def decrease_note_count_if_public(actor, object) do
76 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
77 end
78
79 defp increase_replies_count_if_reply(%{
80 "object" => %{"inReplyTo" => reply_ap_id} = object,
81 "type" => "Create"
82 }) do
83 if is_public?(object) do
84 Object.increase_replies_count(reply_ap_id)
85 end
86 end
87
88 defp increase_replies_count_if_reply(_create_data), do: :noop
89
90 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
91 @impl true
92 def persist(%{"type" => type} = object, meta) when type in @object_types do
93 with {:ok, object} <- Object.create(object) do
94 {:ok, object, meta}
95 end
96 end
97
98 @impl true
99 def persist(object, meta) do
100 with local <- Keyword.fetch!(meta, :local),
101 {recipients, _, _} <- get_recipients(object),
102 {:ok, activity} <-
103 Repo.insert(%Activity{
104 data: object,
105 local: local,
106 recipients: recipients,
107 actor: object["actor"]
108 }),
109 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
110 {:ok, _} <- maybe_create_activity_expiration(activity) do
111 {:ok, activity, meta}
112 end
113 end
114
115 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
116 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map, fake),
119 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
120 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {recipients, _, _} = get_recipients(map),
123 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
124 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
125 {:ok, map, object} <- insert_full_object(map),
126 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
127 # Splice in the child object if we have one.
128 activity = Maps.put_if_present(activity, :object, object)
129
130 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
131 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
132 end)
133
134 {:ok, activity}
135 else
136 %Activity{} = activity ->
137 {:ok, activity}
138
139 {:actor_check, _} ->
140 {:error, false}
141
142 {:containment, _} = error ->
143 error
144
145 {:error, _} = error ->
146 error
147
148 {:fake, true, map, recipients} ->
149 activity = %Activity{
150 data: map,
151 local: local,
152 actor: map["actor"],
153 recipients: recipients,
154 id: "pleroma:fakeid"
155 }
156
157 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
158 {:ok, activity}
159
160 {:remote_limit_pass, _} ->
161 {:error, :remote_limit}
162
163 {:reject, _} = e ->
164 {:error, e}
165 end
166 end
167
168 defp insert_activity_with_expiration(data, local, recipients) do
169 struct = %Activity{
170 data: data,
171 local: local,
172 actor: data["actor"],
173 recipients: recipients
174 }
175
176 with {:ok, activity} <- Repo.insert(struct) do
177 maybe_create_activity_expiration(activity)
178 end
179 end
180
181 def notify_and_stream(activity) do
182 Notification.create_notifications(activity)
183
184 conversation = create_or_bump_conversation(activity, activity.actor)
185 participations = get_participations(conversation)
186 stream_out(activity)
187 stream_out_participations(participations)
188 end
189
190 defp maybe_create_activity_expiration(
191 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
192 ) do
193 with {:ok, _job} <-
194 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
195 activity_id: activity.id,
196 expires_at: expires_at
197 }) do
198 {:ok, activity}
199 end
200 end
201
202 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
203
204 defp create_or_bump_conversation(activity, actor) do
205 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
206 %User{} = user <- User.get_cached_by_ap_id(actor) do
207 Participation.mark_as_read(user, conversation)
208 {:ok, conversation}
209 end
210 end
211
212 defp get_participations({:ok, conversation}) do
213 conversation
214 |> Repo.preload(:participations, force: true)
215 |> Map.get(:participations)
216 end
217
218 defp get_participations(_), do: []
219
220 def stream_out_participations(participations) do
221 participations =
222 participations
223 |> Repo.preload(:user)
224
225 Streamer.stream("participation", participations)
226 end
227
228 @impl true
229 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
230 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
231 conversation = Repo.preload(conversation, :participations)
232
233 last_activity_id =
234 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
235 user: user,
236 blocking_user: user
237 })
238
239 if last_activity_id do
240 stream_out_participations(conversation.participations)
241 end
242 end
243 end
244
245 @impl true
246 def stream_out_participations(_, _), do: :noop
247
248 @impl true
249 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
250 when data_type in ["Create", "Announce", "Delete"] do
251 activity
252 |> Topics.get_activity_topics()
253 |> Streamer.stream(activity)
254 end
255
256 @impl true
257 def stream_out(_activity) do
258 :noop
259 end
260
261 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
262 def create(params, fake \\ false) do
263 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
264 result
265 end
266 end
267
268 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
269 additional = params[:additional] || %{}
270 # only accept false as false value
271 local = !(params[:local] == false)
272 published = params[:published]
273 quick_insert? = Config.get([:env]) == :benchmark
274
275 create_data =
276 make_create_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(create_data, local, fake),
282 {:fake, false, activity} <- {:fake, fake, activity},
283 _ <- increase_replies_count_if_reply(create_data),
284 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
285 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
286 _ <- notify_and_stream(activity),
287 :ok <- maybe_federate(activity) do
288 {:ok, activity}
289 else
290 {:quick_insert, true, activity} ->
291 {:ok, activity}
292
293 {:fake, true, activity} ->
294 {:ok, activity}
295
296 {:error, message} ->
297 Repo.rollback(message)
298 end
299 end
300
301 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
302 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
303 additional = params[:additional] || %{}
304 # only accept false as false value
305 local = !(params[:local] == false)
306 published = params[:published]
307
308 listen_data =
309 make_listen_data(
310 %{to: to, actor: actor, published: published, context: context, object: object},
311 additional
312 )
313
314 with {:ok, activity} <- insert(listen_data, local),
315 _ <- notify_and_stream(activity),
316 :ok <- maybe_federate(activity) do
317 {:ok, activity}
318 end
319 end
320
321 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
322 {:ok, Activity.t()} | nil | {:error, any()}
323 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
324 with {:ok, result} <-
325 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
326 result
327 end
328 end
329
330 defp do_unfollow(follower, followed, activity_id, local) do
331 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
332 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
333 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
334 {:ok, activity} <- insert(unfollow_data, local),
335 _ <- notify_and_stream(activity),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 nil -> nil
340 {:error, error} -> Repo.rollback(error)
341 end
342 end
343
344 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
345 def flag(params) do
346 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
347 result
348 end
349 end
350
351 defp do_flag(
352 %{
353 actor: actor,
354 context: _context,
355 account: account,
356 statuses: statuses,
357 content: content
358 } = params
359 ) do
360 # only accept false as false value
361 local = !(params[:local] == false)
362 forward = !(params[:forward] == false)
363
364 additional = params[:additional] || %{}
365
366 additional =
367 if forward do
368 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
369 else
370 Map.merge(additional, %{"to" => [], "cc" => []})
371 end
372
373 with flag_data <- make_flag_data(params, additional),
374 {:ok, activity} <- insert(flag_data, local),
375 {:ok, stripped_activity} <- strip_report_status_data(activity),
376 _ <- notify_and_stream(activity),
377 :ok <-
378 maybe_federate(stripped_activity) do
379 User.all_superusers()
380 |> Enum.filter(fn user -> not is_nil(user.email) end)
381 |> Enum.each(fn superuser ->
382 superuser
383 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
384 |> Pleroma.Emails.Mailer.deliver_async()
385 end)
386
387 {:ok, activity}
388 else
389 {:error, error} -> Repo.rollback(error)
390 end
391 end
392
393 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
394 def move(%User{} = origin, %User{} = target, local \\ true) do
395 params = %{
396 "type" => "Move",
397 "actor" => origin.ap_id,
398 "object" => origin.ap_id,
399 "target" => target.ap_id
400 }
401
402 with true <- origin.ap_id in target.also_known_as,
403 {:ok, activity} <- insert(params, local),
404 _ <- notify_and_stream(activity) do
405 maybe_federate(activity)
406
407 BackgroundWorker.enqueue("move_following", %{
408 "origin_id" => origin.id,
409 "target_id" => target.id
410 })
411
412 {:ok, activity}
413 else
414 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
415 err -> err
416 end
417 end
418
419 def fetch_activities_for_context_query(context, opts) do
420 public = [Constants.as_public()]
421
422 recipients =
423 if opts[:user],
424 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
425 else: public
426
427 from(activity in Activity)
428 |> maybe_preload_objects(opts)
429 |> maybe_preload_bookmarks(opts)
430 |> maybe_set_thread_muted_field(opts)
431 |> restrict_blocked(opts)
432 |> restrict_recipients(recipients, opts[:user])
433 |> restrict_filtered(opts)
434 |> where(
435 [activity],
436 fragment(
437 "?->>'type' = ? and ?->>'context' = ?",
438 activity.data,
439 "Create",
440 activity.data,
441 ^context
442 )
443 )
444 |> exclude_poll_votes(opts)
445 |> exclude_id(opts)
446 |> order_by([activity], desc: activity.id)
447 end
448
449 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
450 def fetch_activities_for_context(context, opts \\ %{}) do
451 context
452 |> fetch_activities_for_context_query(opts)
453 |> Repo.all()
454 end
455
456 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
457 FlakeId.Ecto.CompatType.t() | nil
458 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
459 context
460 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
461 |> restrict_visibility(%{visibility: "direct"})
462 |> limit(1)
463 |> select([a], a.id)
464 |> Repo.one()
465 end
466
467 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
468 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
469 opts = Map.delete(opts, :user)
470
471 [Constants.as_public()]
472 |> fetch_activities_query(opts)
473 |> restrict_unlisted(opts)
474 |> Pagination.fetch_paginated(opts, pagination)
475 end
476
477 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
478 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
479 opts
480 |> Map.put(:restrict_unlisted, true)
481 |> fetch_public_or_unlisted_activities(pagination)
482 end
483
484 @valid_visibilities ~w[direct unlisted public private]
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when is_list(visibility) do
488 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
489 from(
490 a in query,
491 where:
492 fragment(
493 "activity_visibility(?, ?, ?) = ANY (?)",
494 a.actor,
495 a.recipients,
496 a.data,
497 ^visibility
498 )
499 )
500 else
501 Logger.error("Could not restrict visibility to #{visibility}")
502 end
503 end
504
505 defp restrict_visibility(query, %{visibility: visibility})
506 when visibility in @valid_visibilities do
507 from(
508 a in query,
509 where:
510 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
511 )
512 end
513
514 defp restrict_visibility(_query, %{visibility: visibility})
515 when visibility not in @valid_visibilities do
516 Logger.error("Could not restrict visibility to #{visibility}")
517 end
518
519 defp restrict_visibility(query, _visibility), do: query
520
521 defp exclude_visibility(query, %{exclude_visibilities: visibility})
522 when is_list(visibility) do
523 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ANY (?)",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 else
536 Logger.error("Could not exclude visibility to #{visibility}")
537 query
538 end
539 end
540
541 defp exclude_visibility(query, %{exclude_visibilities: visibility})
542 when visibility in @valid_visibilities do
543 from(
544 a in query,
545 where:
546 not fragment(
547 "activity_visibility(?, ?, ?) = ?",
548 a.actor,
549 a.recipients,
550 a.data,
551 ^visibility
552 )
553 )
554 end
555
556 defp exclude_visibility(query, %{exclude_visibilities: visibility})
557 when visibility not in [nil | @valid_visibilities] do
558 Logger.error("Could not exclude visibility to #{visibility}")
559 query
560 end
561
562 defp exclude_visibility(query, _visibility), do: query
563
564 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
565 do: query
566
567 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
568 do: query
569
570 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
571 from(
572 a in query,
573 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
574 )
575 end
576
577 defp restrict_thread_visibility(query, _, _), do: query
578
579 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
580 params =
581 params
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584
585 %{
586 godmode: params[:godmode],
587 reading_user: reading_user
588 }
589 |> user_activities_recipients()
590 |> fetch_activities(params)
591 |> Enum.reverse()
592 end
593
594 def fetch_user_activities(user, reading_user, params \\ %{})
595
596 def fetch_user_activities(user, reading_user, %{total: true} = params) do
597 result = fetch_activities_for_user(user, reading_user, params)
598
599 Keyword.put(result, :items, Enum.reverse(result[:items]))
600 end
601
602 def fetch_user_activities(user, reading_user, params) do
603 user
604 |> fetch_activities_for_user(reading_user, params)
605 |> Enum.reverse()
606 end
607
608 defp fetch_activities_for_user(user, reading_user, params) do
609 params =
610 params
611 |> Map.put(:type, ["Create", "Announce"])
612 |> Map.put(:user, reading_user)
613 |> Map.put(:actor_id, user.ap_id)
614 |> Map.put(:pinned_activity_ids, user.pinned_activities)
615
616 params =
617 if User.blocks?(reading_user, user) do
618 params
619 else
620 params
621 |> Map.put(:blocking_user, reading_user)
622 |> Map.put(:muting_user, reading_user)
623 end
624
625 pagination_type = Map.get(params, :pagination_type) || :keyset
626
627 %{
628 godmode: params[:godmode],
629 reading_user: reading_user
630 }
631 |> user_activities_recipients()
632 |> fetch_activities(params, pagination_type)
633 end
634
635 def fetch_statuses(reading_user, %{total: true} = params) do
636 result = fetch_activities_for_reading_user(reading_user, params)
637 Keyword.put(result, :items, Enum.reverse(result[:items]))
638 end
639
640 def fetch_statuses(reading_user, params) do
641 reading_user
642 |> fetch_activities_for_reading_user(params)
643 |> Enum.reverse()
644 end
645
646 defp fetch_activities_for_reading_user(reading_user, params) do
647 params = Map.put(params, :type, ["Create", "Announce"])
648
649 %{
650 godmode: params[:godmode],
651 reading_user: reading_user
652 }
653 |> user_activities_recipients()
654 |> fetch_activities(params, :offset)
655 end
656
657 defp user_activities_recipients(%{godmode: true}), do: []
658
659 defp user_activities_recipients(%{reading_user: reading_user}) do
660 if reading_user do
661 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
662 else
663 [Constants.as_public()]
664 end
665 end
666
667 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
668 raise "Can't use the child object without preloading!"
669 end
670
671 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
672 from(
673 [activity, object] in query,
674 where:
675 fragment(
676 "?->>'type' != ? or ?->>'actor' != ?",
677 activity.data,
678 "Announce",
679 object.data,
680 ^actor
681 )
682 )
683 end
684
685 defp restrict_announce_object_actor(query, _), do: query
686
687 defp restrict_since(query, %{since_id: ""}), do: query
688
689 defp restrict_since(query, %{since_id: since_id}) do
690 from(activity in query, where: activity.id > ^since_id)
691 end
692
693 defp restrict_since(query, _), do: query
694
695 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
696 raise "Can't use the child object without preloading!"
697 end
698
699 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
700 from(
701 [_activity, object] in query,
702 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
703 )
704 end
705
706 defp restrict_tag_reject(query, _), do: query
707
708 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
709 raise "Can't use the child object without preloading!"
710 end
711
712 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
713 from(
714 [_activity, object] in query,
715 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
716 )
717 end
718
719 defp restrict_tag_all(query, _), do: query
720
721 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
722 raise "Can't use the child object without preloading!"
723 end
724
725 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
726 from(
727 [_activity, object] in query,
728 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
729 )
730 end
731
732 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
733 from(
734 [_activity, object] in query,
735 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
736 )
737 end
738
739 defp restrict_tag(query, _), do: query
740
741 defp restrict_recipients(query, [], _user), do: query
742
743 defp restrict_recipients(query, recipients, nil) do
744 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
745 end
746
747 defp restrict_recipients(query, recipients, user) do
748 from(
749 activity in query,
750 where: fragment("? && ?", ^recipients, activity.recipients),
751 or_where: activity.actor == ^user.ap_id
752 )
753 end
754
755 defp restrict_local(query, %{local_only: true}) do
756 from(activity in query, where: activity.local == true)
757 end
758
759 defp restrict_local(query, _), do: query
760
761 defp restrict_actor(query, %{actor_id: actor_id}) do
762 from(activity in query, where: activity.actor == ^actor_id)
763 end
764
765 defp restrict_actor(query, _), do: query
766
767 defp restrict_type(query, %{type: type}) when is_binary(type) do
768 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
769 end
770
771 defp restrict_type(query, %{type: type}) do
772 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
773 end
774
775 defp restrict_type(query, _), do: query
776
777 defp restrict_state(query, %{state: state}) do
778 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
779 end
780
781 defp restrict_state(query, _), do: query
782
783 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
784 from(
785 [_activity, object] in query,
786 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
787 )
788 end
789
790 defp restrict_favorited_by(query, _), do: query
791
792 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
793 raise "Can't use the child object without preloading!"
794 end
795
796 defp restrict_media(query, %{only_media: true}) do
797 from(
798 [activity, object] in query,
799 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
800 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
801 )
802 end
803
804 defp restrict_media(query, _), do: query
805
806 defp restrict_replies(query, %{exclude_replies: true}) do
807 from(
808 [_activity, object] in query,
809 where: fragment("?->>'inReplyTo' is null", object.data)
810 )
811 end
812
813 defp restrict_replies(query, %{
814 reply_filtering_user: %User{} = user,
815 reply_visibility: "self"
816 }) do
817 from(
818 [activity, object] in query,
819 where:
820 fragment(
821 "?->>'inReplyTo' is null OR ? = ANY(?)",
822 object.data,
823 ^user.ap_id,
824 activity.recipients
825 )
826 )
827 end
828
829 defp restrict_replies(query, %{
830 reply_filtering_user: %User{} = user,
831 reply_visibility: "following"
832 }) do
833 from(
834 [activity, object] in query,
835 where:
836 fragment(
837 """
838 ?->>'type' != 'Create' -- This isn't a Create
839 OR ?->>'inReplyTo' is null -- this isn't a reply
840 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
841 -- unless they are the author (because authors
842 -- are also part of the recipients). This leads
843 -- to a bug that self-replies by friends won't
844 -- show up.
845 OR ? = ? -- The actor is us
846 """,
847 activity.data,
848 object.data,
849 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
850 activity.recipients,
851 activity.actor,
852 activity.actor,
853 ^user.ap_id
854 )
855 )
856 end
857
858 defp restrict_replies(query, _), do: query
859
860 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
861 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
862 end
863
864 defp restrict_reblogs(query, _), do: query
865
866 defp restrict_muted(query, %{with_muted: true}), do: query
867
868 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
869 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
870
871 query =
872 from([activity] in query,
873 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
874 where:
875 fragment(
876 "not (?->'to' \\?| ?) or ? = ?",
877 activity.data,
878 ^mutes,
879 activity.actor,
880 ^user.ap_id
881 )
882 )
883
884 unless opts[:skip_preload] do
885 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
886 else
887 query
888 end
889 end
890
891 defp restrict_muted(query, _), do: query
892
893 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
894 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
895 domain_blocks = user.domain_blocks || []
896
897 following_ap_ids = User.get_friends_ap_ids(user)
898
899 query =
900 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
901
902 from(
903 [activity, object: o] in query,
904 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
905 where:
906 fragment(
907 "((not (? && ?)) or ? = ?)",
908 activity.recipients,
909 ^blocked_ap_ids,
910 activity.actor,
911 ^user.ap_id
912 ),
913 where:
914 fragment(
915 "recipients_contain_blocked_domains(?, ?) = false",
916 activity.recipients,
917 ^domain_blocks
918 ),
919 where:
920 fragment(
921 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
922 activity.data,
923 activity.data,
924 ^blocked_ap_ids
925 ),
926 where:
927 fragment(
928 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
929 activity.actor,
930 ^domain_blocks,
931 activity.actor,
932 ^following_ap_ids
933 ),
934 where:
935 fragment(
936 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
937 o.data,
938 ^domain_blocks,
939 o.data,
940 ^following_ap_ids
941 )
942 )
943 end
944
945 defp restrict_blocked(query, _), do: query
946
947 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
948 from(
949 activity in query,
950 where:
951 fragment(
952 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
953 activity.data,
954 ^[Constants.as_public()]
955 )
956 )
957 end
958
959 defp restrict_unlisted(query, _), do: query
960
961 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
962 from(activity in query, where: activity.id in ^ids)
963 end
964
965 defp restrict_pinned(query, _), do: query
966
967 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
968 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
969
970 from(
971 activity in query,
972 where:
973 fragment(
974 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
975 activity.data,
976 activity.actor,
977 ^muted_reblogs
978 )
979 )
980 end
981
982 defp restrict_muted_reblogs(query, _), do: query
983
984 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
985 from(
986 activity in query,
987 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
988 )
989 end
990
991 defp restrict_instance(query, _), do: query
992
993 defp restrict_filtered(query, %{user: %User{} = user}) do
994 case Filter.compose_regex(user) do
995 nil ->
996 query
997
998 regex ->
999 from([activity, object] in query,
1000 where:
1001 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1002 activity.actor == ^user.ap_id
1003 )
1004 end
1005 end
1006
1007 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1008 restrict_filtered(query, %{user: user})
1009 end
1010
1011 defp restrict_filtered(query, _), do: query
1012
1013 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1014
1015 defp exclude_poll_votes(query, _) do
1016 if has_named_binding?(query, :object) do
1017 from([activity, object: o] in query,
1018 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1019 )
1020 else
1021 query
1022 end
1023 end
1024
1025 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1026
1027 defp exclude_chat_messages(query, _) do
1028 if has_named_binding?(query, :object) do
1029 from([activity, object: o] in query,
1030 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1031 )
1032 else
1033 query
1034 end
1035 end
1036
1037 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1038
1039 defp exclude_invisible_actors(query, _opts) do
1040 invisible_ap_ids =
1041 User.Query.build(%{invisible: true, select: [:ap_id]})
1042 |> Repo.all()
1043 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1044
1045 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1046 end
1047
1048 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1049 from(activity in query, where: activity.id != ^id)
1050 end
1051
1052 defp exclude_id(query, _), do: query
1053
1054 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1055
1056 defp maybe_preload_objects(query, _) do
1057 query
1058 |> Activity.with_preloaded_object()
1059 end
1060
1061 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1062
1063 defp maybe_preload_bookmarks(query, opts) do
1064 query
1065 |> Activity.with_preloaded_bookmark(opts[:user])
1066 end
1067
1068 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1069 query
1070 |> Activity.with_preloaded_report_notes()
1071 end
1072
1073 defp maybe_preload_report_notes(query, _), do: query
1074
1075 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1076
1077 defp maybe_set_thread_muted_field(query, opts) do
1078 query
1079 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1080 end
1081
1082 defp maybe_order(query, %{order: :desc}) do
1083 query
1084 |> order_by(desc: :id)
1085 end
1086
1087 defp maybe_order(query, %{order: :asc}) do
1088 query
1089 |> order_by(asc: :id)
1090 end
1091
1092 defp maybe_order(query, _), do: query
1093
1094 defp fetch_activities_query_ap_ids_ops(opts) do
1095 source_user = opts[:muting_user]
1096 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1097
1098 ap_id_relationships =
1099 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1100 [:block | ap_id_relationships]
1101 else
1102 ap_id_relationships
1103 end
1104
1105 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1106
1107 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1108 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1109
1110 restrict_muted_reblogs_opts =
1111 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1112
1113 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1114 end
1115
1116 def fetch_activities_query(recipients, opts \\ %{}) do
1117 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1118 fetch_activities_query_ap_ids_ops(opts)
1119
1120 config = %{
1121 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1122 }
1123
1124 Activity
1125 |> maybe_preload_objects(opts)
1126 |> maybe_preload_bookmarks(opts)
1127 |> maybe_preload_report_notes(opts)
1128 |> maybe_set_thread_muted_field(opts)
1129 |> maybe_order(opts)
1130 |> restrict_recipients(recipients, opts[:user])
1131 |> restrict_replies(opts)
1132 |> restrict_tag(opts)
1133 |> restrict_tag_reject(opts)
1134 |> restrict_tag_all(opts)
1135 |> restrict_since(opts)
1136 |> restrict_local(opts)
1137 |> restrict_actor(opts)
1138 |> restrict_type(opts)
1139 |> restrict_state(opts)
1140 |> restrict_favorited_by(opts)
1141 |> restrict_blocked(restrict_blocked_opts)
1142 |> restrict_muted(restrict_muted_opts)
1143 |> restrict_filtered(opts)
1144 |> restrict_media(opts)
1145 |> restrict_visibility(opts)
1146 |> restrict_thread_visibility(opts, config)
1147 |> restrict_reblogs(opts)
1148 |> restrict_pinned(opts)
1149 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1150 |> restrict_instance(opts)
1151 |> restrict_announce_object_actor(opts)
1152 |> restrict_filtered(opts)
1153 |> Activity.restrict_deactivated_users()
1154 |> exclude_poll_votes(opts)
1155 |> exclude_chat_messages(opts)
1156 |> exclude_invisible_actors(opts)
1157 |> exclude_visibility(opts)
1158 end
1159
1160 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1161 list_memberships = Pleroma.List.memberships(opts[:user])
1162
1163 fetch_activities_query(recipients ++ list_memberships, opts)
1164 |> Pagination.fetch_paginated(opts, pagination)
1165 |> Enum.reverse()
1166 |> maybe_update_cc(list_memberships, opts[:user])
1167 end
1168
1169 @doc """
1170 Fetch favorites activities of user with order by sort adds to favorites
1171 """
1172 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1173 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1174 user.ap_id
1175 |> Activity.Queries.by_actor()
1176 |> Activity.Queries.by_type("Like")
1177 |> Activity.with_joined_object()
1178 |> Object.with_joined_activity()
1179 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1180 |> order_by([like, _, _], desc_nulls_last: like.id)
1181 |> Pagination.fetch_paginated(
1182 Map.merge(params, %{skip_order: true}),
1183 pagination
1184 )
1185 end
1186
1187 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1188 Enum.map(activities, fn
1189 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1190 if Enum.any?(bcc, &(&1 in list_memberships)) do
1191 update_in(activity.data["cc"], &[user_ap_id | &1])
1192 else
1193 activity
1194 end
1195
1196 activity ->
1197 activity
1198 end)
1199 end
1200
1201 defp maybe_update_cc(activities, _, _), do: activities
1202
1203 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1204 from(activity in query,
1205 where:
1206 fragment("? && ?", activity.recipients, ^recipients) or
1207 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1208 ^Constants.as_public() in activity.recipients)
1209 )
1210 end
1211
1212 def fetch_activities_bounded(
1213 recipients,
1214 recipients_with_public,
1215 opts \\ %{},
1216 pagination \\ :keyset
1217 ) do
1218 fetch_activities_query([], opts)
1219 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1220 |> Pagination.fetch_paginated(opts, pagination)
1221 |> Enum.reverse()
1222 end
1223
1224 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1225 def upload(file, opts \\ []) do
1226 with {:ok, data} <- Upload.store(file, opts) do
1227 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1228
1229 Repo.insert(%Object{data: obj_data})
1230 end
1231 end
1232
1233 @spec get_actor_url(any()) :: binary() | nil
1234 defp get_actor_url(url) when is_binary(url), do: url
1235 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1236
1237 defp get_actor_url(url) when is_list(url) do
1238 url
1239 |> List.first()
1240 |> get_actor_url()
1241 end
1242
1243 defp get_actor_url(_url), do: nil
1244
1245 defp object_to_user_data(data) do
1246 avatar =
1247 data["icon"]["url"] &&
1248 %{
1249 "type" => "Image",
1250 "url" => [%{"href" => data["icon"]["url"]}]
1251 }
1252
1253 banner =
1254 data["image"]["url"] &&
1255 %{
1256 "type" => "Image",
1257 "url" => [%{"href" => data["image"]["url"]}]
1258 }
1259
1260 fields =
1261 data
1262 |> Map.get("attachment", [])
1263 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1264 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1265
1266 emojis =
1267 data
1268 |> Map.get("tag", [])
1269 |> Enum.filter(fn
1270 %{"type" => "Emoji"} -> true
1271 _ -> false
1272 end)
1273 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1274 {String.trim(name, ":"), url}
1275 end)
1276
1277 is_locked = data["manuallyApprovesFollowers"] || false
1278 capabilities = data["capabilities"] || %{}
1279 accepts_chat_messages = capabilities["acceptsChatMessages"]
1280 data = Transmogrifier.maybe_fix_user_object(data)
1281 is_discoverable = data["discoverable"] || false
1282 invisible = data["invisible"] || false
1283 actor_type = data["type"] || "Person"
1284
1285 public_key =
1286 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1287 data["publicKey"]["publicKeyPem"]
1288 else
1289 nil
1290 end
1291
1292 shared_inbox =
1293 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1294 data["endpoints"]["sharedInbox"]
1295 else
1296 nil
1297 end
1298
1299 user_data = %{
1300 ap_id: data["id"],
1301 uri: get_actor_url(data["url"]),
1302 ap_enabled: true,
1303 banner: banner,
1304 fields: fields,
1305 emoji: emojis,
1306 is_locked: is_locked,
1307 is_discoverable: is_discoverable,
1308 invisible: invisible,
1309 avatar: avatar,
1310 name: data["name"],
1311 follower_address: data["followers"],
1312 following_address: data["following"],
1313 bio: data["summary"] || "",
1314 actor_type: actor_type,
1315 also_known_as: Map.get(data, "alsoKnownAs", []),
1316 public_key: public_key,
1317 inbox: data["inbox"],
1318 shared_inbox: shared_inbox,
1319 accepts_chat_messages: accepts_chat_messages
1320 }
1321
1322 # nickname can be nil because of virtual actors
1323 if data["preferredUsername"] do
1324 Map.put(
1325 user_data,
1326 :nickname,
1327 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1328 )
1329 else
1330 Map.put(user_data, :nickname, nil)
1331 end
1332 end
1333
1334 def fetch_follow_information_for_user(user) do
1335 with {:ok, following_data} <-
1336 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1337 {:ok, hide_follows} <- collection_private(following_data),
1338 {:ok, followers_data} <-
1339 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1340 {:ok, hide_followers} <- collection_private(followers_data) do
1341 {:ok,
1342 %{
1343 hide_follows: hide_follows,
1344 follower_count: normalize_counter(followers_data["totalItems"]),
1345 following_count: normalize_counter(following_data["totalItems"]),
1346 hide_followers: hide_followers
1347 }}
1348 else
1349 {:error, _} = e -> e
1350 e -> {:error, e}
1351 end
1352 end
1353
1354 defp normalize_counter(counter) when is_integer(counter), do: counter
1355 defp normalize_counter(_), do: 0
1356
1357 def maybe_update_follow_information(user_data) do
1358 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1359 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1360 {_, true} <-
1361 {:collections_available,
1362 !!(user_data[:following_address] && user_data[:follower_address])},
1363 {:ok, info} <-
1364 fetch_follow_information_for_user(user_data) do
1365 info = Map.merge(user_data[:info] || %{}, info)
1366
1367 user_data
1368 |> Map.put(:info, info)
1369 else
1370 {:user_type_check, false} ->
1371 user_data
1372
1373 {:collections_available, false} ->
1374 user_data
1375
1376 {:enabled, false} ->
1377 user_data
1378
1379 e ->
1380 Logger.error(
1381 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1382 )
1383
1384 user_data
1385 end
1386 end
1387
1388 defp collection_private(%{"first" => %{"type" => type}})
1389 when type in ["CollectionPage", "OrderedCollectionPage"],
1390 do: {:ok, false}
1391
1392 defp collection_private(%{"first" => first}) do
1393 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1394 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1395 {:ok, false}
1396 else
1397 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1398 {:error, _} = e -> e
1399 e -> {:error, e}
1400 end
1401 end
1402
1403 defp collection_private(_data), do: {:ok, true}
1404
1405 def user_data_from_user_object(data) do
1406 with {:ok, data} <- MRF.filter(data) do
1407 {:ok, object_to_user_data(data)}
1408 else
1409 e -> {:error, e}
1410 end
1411 end
1412
1413 def fetch_and_prepare_user_from_ap_id(ap_id) do
1414 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1415 {:ok, data} <- user_data_from_user_object(data) do
1416 {:ok, maybe_update_follow_information(data)}
1417 else
1418 # If this has been deleted, only log a debug and not an error
1419 {:error, "Object has been deleted" = e} ->
1420 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1421 {:error, e}
1422
1423 {:error, {:reject, reason} = e} ->
1424 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1425 {:error, e}
1426
1427 {:error, e} ->
1428 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1429 {:error, e}
1430 end
1431 end
1432
1433 def maybe_handle_clashing_nickname(data) do
1434 with nickname when is_binary(nickname) <- data[:nickname],
1435 %User{} = old_user <- User.get_by_nickname(nickname),
1436 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1437 Logger.info(
1438 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1439 data[:ap_id]
1440 }, renaming."
1441 )
1442
1443 old_user
1444 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1445 |> User.update_and_set_cache()
1446 else
1447 {:ap_id_comparison, true} ->
1448 Logger.info(
1449 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1450 )
1451
1452 _ ->
1453 nil
1454 end
1455 end
1456
1457 def make_user_from_ap_id(ap_id) do
1458 user = User.get_cached_by_ap_id(ap_id)
1459
1460 if user && !User.ap_enabled?(user) do
1461 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1462 else
1463 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1464 if user do
1465 user
1466 |> User.remote_user_changeset(data)
1467 |> User.update_and_set_cache()
1468 else
1469 maybe_handle_clashing_nickname(data)
1470
1471 data
1472 |> User.remote_user_changeset()
1473 |> Repo.insert()
1474 |> User.set_cache()
1475 end
1476 end
1477 end
1478 end
1479
1480 def make_user_from_nickname(nickname) do
1481 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1482 make_user_from_ap_id(ap_id)
1483 else
1484 _e -> {:error, "No AP id in WebFinger"}
1485 end
1486 end
1487
1488 # filter out broken threads
1489 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1490 entire_thread_visible_for_user?(activity, user)
1491 end
1492
1493 # do post-processing on a specific activity
1494 def contain_activity(%Activity{} = activity, %User{} = user) do
1495 contain_broken_threads(activity, user)
1496 end
1497
1498 def fetch_direct_messages_query do
1499 Activity
1500 |> restrict_type(%{type: "Create"})
1501 |> restrict_visibility(%{visibility: "direct"})
1502 |> order_by([activity], asc: activity.id)
1503 end
1504 end