Speed up instance timeline query
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
42 {recipients, to, cc}
43 end
44
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
50 {recipients, to, cc}
51 end
52
53 defp check_actor_is_active(nil), do: true
54
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
58 _ -> false
59 end
60 end
61
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
65 end
66
67 defp check_remote_limit(_), do: true
68
69 defp increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
71 end
72
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
75 end
76
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
79 "type" => "Create"
80 }) do
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
83 end
84 end
85
86 defp increase_replies_count_if_reply(_create_data), do: :noop
87
88 defp increase_poll_votes_if_vote(%{
89 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
90 "type" => "Create",
91 "actor" => actor
92 }) do
93 Object.increase_vote_count(reply_ap_id, name, actor)
94 end
95
96 defp increase_poll_votes_if_vote(_create_data), do: :noop
97
98 @object_types ["ChatMessage"]
99 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
100 def persist(%{"type" => type} = object, meta) when type in @object_types do
101 with {:ok, object} <- Object.create(object) do
102 {:ok, object, meta}
103 end
104 end
105
106 def persist(object, meta) do
107 with local <- Keyword.fetch!(meta, :local),
108 {recipients, _, _} <- get_recipients(object),
109 {:ok, activity} <-
110 Repo.insert(%Activity{
111 data: object,
112 local: local,
113 recipients: recipients,
114 actor: object["actor"]
115 }) do
116 {:ok, activity, meta}
117 end
118 end
119
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map) do
131 {:ok, activity} =
132 %Activity{
133 data: map,
134 local: local,
135 actor: map["actor"],
136 recipients: recipients
137 }
138 |> Repo.insert()
139 |> maybe_create_activity_expiration()
140
141 # Splice in the child object if we have one.
142 activity = Maps.put_if_present(activity, :object, object)
143
144 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
145
146 {:ok, activity}
147 else
148 %Activity{} = activity ->
149 {:ok, activity}
150
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
153 data: map,
154 local: local,
155 actor: map["actor"],
156 recipients: recipients,
157 id: "pleroma:fakeid"
158 }
159
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:ok, activity}
162
163 error ->
164 {:error, error}
165 end
166 end
167
168 def notify_and_stream(activity) do
169 Notification.create_notifications(activity)
170
171 conversation = create_or_bump_conversation(activity, activity.actor)
172 participations = get_participations(conversation)
173 stream_out(activity)
174 stream_out_participations(participations)
175 end
176
177 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
178 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
179 {:ok, activity}
180 end
181 end
182
183 defp maybe_create_activity_expiration(result), do: result
184
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
189 {:ok, conversation}
190 end
191 end
192
193 defp get_participations({:ok, conversation}) do
194 conversation
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
197 end
198
199 defp get_participations(_), do: []
200
201 def stream_out_participations(participations) do
202 participations =
203 participations
204 |> Repo.preload(:user)
205
206 Streamer.stream("participation", participations)
207 end
208
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
212
213 last_activity_id =
214 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
215 user: user,
216 blocking_user: user
217 })
218
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
221 end
222 end
223 end
224
225 def stream_out_participations(_, _), do: :noop
226
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
229 activity
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
232 end
233
234 def stream_out(_activity) do
235 :noop
236 end
237
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
241 result
242 end
243 end
244
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
251
252 create_data =
253 make_create_data(
254 %{to: to, actor: actor, published: published, context: context, object: object},
255 additional
256 )
257
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:quick_insert, true, activity} ->
269 {:ok, activity}
270
271 {:fake, true, activity} ->
272 {:ok, activity}
273
274 {:error, message} ->
275 Repo.rollback(message)
276 end
277 end
278
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
285
286 listen_data =
287 make_listen_data(
288 %{to: to, actor: actor, published: published, context: context, object: object},
289 additional
290 )
291
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
295 {:ok, activity}
296 end
297 end
298
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
302 end
303
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
307 end
308
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
313
314 data =
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
317
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
326 {:ok, Activity.t()} | nil | {:error, any()}
327 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
330 result
331 end
332 end
333
334 defp do_unfollow(follower, followed, activity_id, local) do
335 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
336 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
337 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
338 {:ok, activity} <- insert(unfollow_data, local),
339 _ <- notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
341 {:ok, activity}
342 else
343 nil -> nil
344 {:error, error} -> Repo.rollback(error)
345 end
346 end
347
348 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
349 def flag(
350 %{
351 actor: actor,
352 context: _context,
353 account: account,
354 statuses: statuses,
355 content: content
356 } = params
357 ) do
358 # only accept false as false value
359 local = !(params[:local] == false)
360 forward = !(params[:forward] == false)
361
362 additional = params[:additional] || %{}
363
364 additional =
365 if forward do
366 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
367 else
368 Map.merge(additional, %{"to" => [], "cc" => []})
369 end
370
371 with flag_data <- make_flag_data(params, additional),
372 {:ok, activity} <- insert(flag_data, local),
373 {:ok, stripped_activity} <- strip_report_status_data(activity),
374 _ <- notify_and_stream(activity),
375 :ok <- maybe_federate(stripped_activity) do
376 User.all_superusers()
377 |> Enum.filter(fn user -> not is_nil(user.email) end)
378 |> Enum.each(fn superuser ->
379 superuser
380 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
381 |> Pleroma.Emails.Mailer.deliver_async()
382 end)
383
384 {:ok, activity}
385 end
386 end
387
388 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
389 def move(%User{} = origin, %User{} = target, local \\ true) do
390 params = %{
391 "type" => "Move",
392 "actor" => origin.ap_id,
393 "object" => origin.ap_id,
394 "target" => target.ap_id
395 }
396
397 with true <- origin.ap_id in target.also_known_as,
398 {:ok, activity} <- insert(params, local),
399 _ <- notify_and_stream(activity) do
400 maybe_federate(activity)
401
402 BackgroundWorker.enqueue("move_following", %{
403 "origin_id" => origin.id,
404 "target_id" => target.id
405 })
406
407 {:ok, activity}
408 else
409 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
410 err -> err
411 end
412 end
413
414 def fetch_activities_for_context_query(context, opts) do
415 public = [Constants.as_public()]
416
417 recipients =
418 if opts[:user],
419 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
420 else: public
421
422 from(activity in Activity)
423 |> maybe_preload_objects(opts)
424 |> maybe_preload_bookmarks(opts)
425 |> maybe_set_thread_muted_field(opts)
426 |> restrict_blocked(opts)
427 |> restrict_recipients(recipients, opts[:user])
428 |> restrict_filtered(opts)
429 |> where(
430 [activity],
431 fragment(
432 "?->>'type' = ? and ?->>'context' = ?",
433 activity.data,
434 "Create",
435 activity.data,
436 ^context
437 )
438 )
439 |> exclude_poll_votes(opts)
440 |> exclude_id(opts)
441 |> order_by([activity], desc: activity.id)
442 end
443
444 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
445 def fetch_activities_for_context(context, opts \\ %{}) do
446 context
447 |> fetch_activities_for_context_query(opts)
448 |> Repo.all()
449 end
450
451 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
452 FlakeId.Ecto.CompatType.t() | nil
453 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
454 context
455 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
456 |> restrict_visibility(%{visibility: "direct"})
457 |> limit(1)
458 |> select([a], a.id)
459 |> Repo.one()
460 end
461
462 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
463 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
464 opts = Map.delete(opts, :user)
465
466 [Constants.as_public()]
467 |> fetch_activities_query(opts)
468 |> restrict_unlisted(opts)
469 |> Pagination.fetch_paginated(opts, pagination)
470 end
471
472 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
473 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
474 opts
475 |> Map.put(:restrict_unlisted, true)
476 |> fetch_public_or_unlisted_activities(pagination)
477 end
478
479 @valid_visibilities ~w[direct unlisted public private]
480
481 defp restrict_visibility(query, %{visibility: visibility})
482 when is_list(visibility) do
483 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
484 from(
485 a in query,
486 where:
487 fragment(
488 "activity_visibility(?, ?, ?) = ANY (?)",
489 a.actor,
490 a.recipients,
491 a.data,
492 ^visibility
493 )
494 )
495 else
496 Logger.error("Could not restrict visibility to #{visibility}")
497 end
498 end
499
500 defp restrict_visibility(query, %{visibility: visibility})
501 when visibility in @valid_visibilities do
502 from(
503 a in query,
504 where:
505 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
506 )
507 end
508
509 defp restrict_visibility(_query, %{visibility: visibility})
510 when visibility not in @valid_visibilities do
511 Logger.error("Could not restrict visibility to #{visibility}")
512 end
513
514 defp restrict_visibility(query, _visibility), do: query
515
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when is_list(visibility) do
518 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
519 from(
520 a in query,
521 where:
522 not fragment(
523 "activity_visibility(?, ?, ?) = ANY (?)",
524 a.actor,
525 a.recipients,
526 a.data,
527 ^visibility
528 )
529 )
530 else
531 Logger.error("Could not exclude visibility to #{visibility}")
532 query
533 end
534 end
535
536 defp exclude_visibility(query, %{exclude_visibilities: visibility})
537 when visibility in @valid_visibilities do
538 from(
539 a in query,
540 where:
541 not fragment(
542 "activity_visibility(?, ?, ?) = ?",
543 a.actor,
544 a.recipients,
545 a.data,
546 ^visibility
547 )
548 )
549 end
550
551 defp exclude_visibility(query, %{exclude_visibilities: visibility})
552 when visibility not in [nil | @valid_visibilities] do
553 Logger.error("Could not exclude visibility to #{visibility}")
554 query
555 end
556
557 defp exclude_visibility(query, _visibility), do: query
558
559 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
560 do: query
561
562 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
563 do: query
564
565 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
566 from(
567 a in query,
568 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
569 )
570 end
571
572 defp restrict_thread_visibility(query, _, _), do: query
573
574 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
575 params =
576 params
577 |> Map.put(:user, reading_user)
578 |> Map.put(:actor_id, user.ap_id)
579
580 %{
581 godmode: params[:godmode],
582 reading_user: reading_user
583 }
584 |> user_activities_recipients()
585 |> fetch_activities(params)
586 |> Enum.reverse()
587 end
588
589 def fetch_user_activities(user, reading_user, params \\ %{}) do
590 params =
591 params
592 |> Map.put(:type, ["Create", "Announce"])
593 |> Map.put(:user, reading_user)
594 |> Map.put(:actor_id, user.ap_id)
595 |> Map.put(:pinned_activity_ids, user.pinned_activities)
596
597 params =
598 if User.blocks?(reading_user, user) do
599 params
600 else
601 params
602 |> Map.put(:blocking_user, reading_user)
603 |> Map.put(:muting_user, reading_user)
604 end
605
606 %{
607 godmode: params[:godmode],
608 reading_user: reading_user
609 }
610 |> user_activities_recipients()
611 |> fetch_activities(params)
612 |> Enum.reverse()
613 end
614
615 def fetch_statuses(reading_user, params) do
616 params = Map.put(params, :type, ["Create", "Announce"])
617
618 %{
619 godmode: params[:godmode],
620 reading_user: reading_user
621 }
622 |> user_activities_recipients()
623 |> fetch_activities(params, :offset)
624 |> Enum.reverse()
625 end
626
627 defp user_activities_recipients(%{godmode: true}), do: []
628
629 defp user_activities_recipients(%{reading_user: reading_user}) do
630 if reading_user do
631 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
632 else
633 [Constants.as_public()]
634 end
635 end
636
637 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
638 raise "Can't use the child object without preloading!"
639 end
640
641 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
642 from(
643 [activity, object] in query,
644 where:
645 fragment(
646 "?->>'type' != ? or ?->>'actor' != ?",
647 activity.data,
648 "Announce",
649 object.data,
650 ^actor
651 )
652 )
653 end
654
655 defp restrict_announce_object_actor(query, _), do: query
656
657 defp restrict_since(query, %{since_id: ""}), do: query
658
659 defp restrict_since(query, %{since_id: since_id}) do
660 from(activity in query, where: activity.id > ^since_id)
661 end
662
663 defp restrict_since(query, _), do: query
664
665 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
666 raise "Can't use the child object without preloading!"
667 end
668
669 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
670 from(
671 [_activity, object] in query,
672 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
673 )
674 end
675
676 defp restrict_tag_reject(query, _), do: query
677
678 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
679 raise "Can't use the child object without preloading!"
680 end
681
682 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
683 from(
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
686 )
687 end
688
689 defp restrict_tag_all(query, _), do: query
690
691 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
692 raise "Can't use the child object without preloading!"
693 end
694
695 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
696 from(
697 [_activity, object] in query,
698 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
699 )
700 end
701
702 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
703 from(
704 [_activity, object] in query,
705 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
706 )
707 end
708
709 defp restrict_tag(query, _), do: query
710
711 defp restrict_recipients(query, [], _user), do: query
712
713 defp restrict_recipients(query, recipients, nil) do
714 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
715 end
716
717 defp restrict_recipients(query, recipients, user) do
718 from(
719 activity in query,
720 where: fragment("? && ?", ^recipients, activity.recipients),
721 or_where: activity.actor == ^user.ap_id
722 )
723 end
724
725 defp restrict_local(query, %{local_only: true}) do
726 from(activity in query, where: activity.local == true)
727 end
728
729 defp restrict_local(query, _), do: query
730
731 defp restrict_actor(query, %{actor_id: actor_id}) do
732 from(activity in query, where: activity.actor == ^actor_id)
733 end
734
735 defp restrict_actor(query, _), do: query
736
737 defp restrict_type(query, %{type: type}) when is_binary(type) do
738 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
739 end
740
741 defp restrict_type(query, %{type: type}) do
742 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
743 end
744
745 defp restrict_type(query, _), do: query
746
747 defp restrict_state(query, %{state: state}) do
748 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
749 end
750
751 defp restrict_state(query, _), do: query
752
753 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
754 from(
755 [_activity, object] in query,
756 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
757 )
758 end
759
760 defp restrict_favorited_by(query, _), do: query
761
762 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
763 raise "Can't use the child object without preloading!"
764 end
765
766 defp restrict_media(query, %{only_media: true}) do
767 from(
768 [activity, object] in query,
769 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
770 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
771 )
772 end
773
774 defp restrict_media(query, _), do: query
775
776 defp restrict_replies(query, %{exclude_replies: true}) do
777 from(
778 [_activity, object] in query,
779 where: fragment("?->>'inReplyTo' is null", object.data)
780 )
781 end
782
783 defp restrict_replies(query, %{
784 reply_filtering_user: user,
785 reply_visibility: "self"
786 }) do
787 from(
788 [activity, object] in query,
789 where:
790 fragment(
791 "?->>'inReplyTo' is null OR ? = ANY(?)",
792 object.data,
793 ^user.ap_id,
794 activity.recipients
795 )
796 )
797 end
798
799 defp restrict_replies(query, %{
800 reply_filtering_user: user,
801 reply_visibility: "following"
802 }) do
803 from(
804 [activity, object] in query,
805 where:
806 fragment(
807 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
808 object.data,
809 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
810 activity.recipients,
811 activity.actor,
812 activity.actor,
813 ^user.ap_id
814 )
815 )
816 end
817
818 defp restrict_replies(query, _), do: query
819
820 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
821 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
822 end
823
824 defp restrict_reblogs(query, _), do: query
825
826 defp restrict_muted(query, %{with_muted: true}), do: query
827
828 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
829 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
830
831 query =
832 from([activity] in query,
833 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
834 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
835 )
836
837 unless opts[:skip_preload] do
838 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
839 else
840 query
841 end
842 end
843
844 defp restrict_muted(query, _), do: query
845
846 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
847 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
848 domain_blocks = user.domain_blocks || []
849
850 following_ap_ids = User.get_friends_ap_ids(user)
851
852 query =
853 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
854
855 from(
856 [activity, object: o] in query,
857 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
858 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
859 where:
860 fragment(
861 "recipients_contain_blocked_domains(?, ?) = false",
862 activity.recipients,
863 ^domain_blocks
864 ),
865 where:
866 fragment(
867 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
868 activity.data,
869 activity.data,
870 ^blocked_ap_ids
871 ),
872 where:
873 fragment(
874 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
875 activity.actor,
876 ^domain_blocks,
877 activity.actor,
878 ^following_ap_ids
879 ),
880 where:
881 fragment(
882 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
883 o.data,
884 ^domain_blocks,
885 o.data,
886 ^following_ap_ids
887 )
888 )
889 end
890
891 defp restrict_blocked(query, _), do: query
892
893 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
894 from(
895 activity in query,
896 where:
897 fragment(
898 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
899 activity.data,
900 ^[Constants.as_public()]
901 )
902 )
903 end
904
905 defp restrict_unlisted(query, _), do: query
906
907 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
908 from(activity in query, where: activity.id in ^ids)
909 end
910
911 defp restrict_pinned(query, _), do: query
912
913 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
914 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
915
916 from(
917 activity in query,
918 where:
919 fragment(
920 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
921 activity.data,
922 activity.actor,
923 ^muted_reblogs
924 )
925 )
926 end
927
928 defp restrict_muted_reblogs(query, _), do: query
929
930 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
931 from(
932 activity in query,
933 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
934 )
935 end
936
937 defp restrict_instance(query, _), do: query
938
939 defp restrict_filtered(query, %{user: %User{} = user}) do
940 case Filter.compose_regex(user) do
941 nil ->
942 query
943
944 regex ->
945 from([activity, object] in query,
946 where:
947 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
948 activity.actor == ^user.ap_id
949 )
950 end
951 end
952
953 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
954 restrict_filtered(query, %{user: user})
955 end
956
957 defp restrict_filtered(query, _), do: query
958
959 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
960
961 defp exclude_poll_votes(query, _) do
962 if has_named_binding?(query, :object) do
963 from([activity, object: o] in query,
964 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
965 )
966 else
967 query
968 end
969 end
970
971 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
972
973 defp exclude_chat_messages(query, _) do
974 if has_named_binding?(query, :object) do
975 from([activity, object: o] in query,
976 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
977 )
978 else
979 query
980 end
981 end
982
983 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
984
985 defp exclude_invisible_actors(query, _opts) do
986 invisible_ap_ids =
987 User.Query.build(%{invisible: true, select: [:ap_id]})
988 |> Repo.all()
989 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
990
991 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
992 end
993
994 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
995 from(activity in query, where: activity.id != ^id)
996 end
997
998 defp exclude_id(query, _), do: query
999
1000 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1001
1002 defp maybe_preload_objects(query, _) do
1003 query
1004 |> Activity.with_preloaded_object()
1005 end
1006
1007 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1008
1009 defp maybe_preload_bookmarks(query, opts) do
1010 query
1011 |> Activity.with_preloaded_bookmark(opts[:user])
1012 end
1013
1014 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1015 query
1016 |> Activity.with_preloaded_report_notes()
1017 end
1018
1019 defp maybe_preload_report_notes(query, _), do: query
1020
1021 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1022
1023 defp maybe_set_thread_muted_field(query, opts) do
1024 query
1025 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1026 end
1027
1028 defp maybe_order(query, %{order: :desc}) do
1029 query
1030 |> order_by(desc: :id)
1031 end
1032
1033 defp maybe_order(query, %{order: :asc}) do
1034 query
1035 |> order_by(asc: :id)
1036 end
1037
1038 defp maybe_order(query, _), do: query
1039
1040 defp fetch_activities_query_ap_ids_ops(opts) do
1041 source_user = opts[:muting_user]
1042 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1043
1044 ap_id_relationships =
1045 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1046 [:block | ap_id_relationships]
1047 else
1048 ap_id_relationships
1049 end
1050
1051 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1052
1053 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1054 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1055
1056 restrict_muted_reblogs_opts =
1057 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1058
1059 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1060 end
1061
1062 def fetch_activities_query(recipients, opts \\ %{}) do
1063 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1064 fetch_activities_query_ap_ids_ops(opts)
1065
1066 config = %{
1067 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1068 }
1069
1070 Activity
1071 |> maybe_preload_objects(opts)
1072 |> maybe_preload_bookmarks(opts)
1073 |> maybe_preload_report_notes(opts)
1074 |> maybe_set_thread_muted_field(opts)
1075 |> maybe_order(opts)
1076 |> restrict_recipients(recipients, opts[:user])
1077 |> restrict_replies(opts)
1078 |> restrict_tag(opts)
1079 |> restrict_tag_reject(opts)
1080 |> restrict_tag_all(opts)
1081 |> restrict_since(opts)
1082 |> restrict_local(opts)
1083 |> restrict_actor(opts)
1084 |> restrict_type(opts)
1085 |> restrict_state(opts)
1086 |> restrict_favorited_by(opts)
1087 |> restrict_blocked(restrict_blocked_opts)
1088 |> restrict_muted(restrict_muted_opts)
1089 |> restrict_filtered(opts)
1090 |> restrict_media(opts)
1091 |> restrict_visibility(opts)
1092 |> restrict_thread_visibility(opts, config)
1093 |> restrict_reblogs(opts)
1094 |> restrict_pinned(opts)
1095 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1096 |> restrict_instance(opts)
1097 |> restrict_announce_object_actor(opts)
1098 |> restrict_filtered(opts)
1099 |> Activity.restrict_deactivated_users()
1100 |> exclude_poll_votes(opts)
1101 |> exclude_chat_messages(opts)
1102 |> exclude_invisible_actors(opts)
1103 |> exclude_visibility(opts)
1104 end
1105
1106 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1107 list_memberships = Pleroma.List.memberships(opts[:user])
1108
1109 fetch_activities_query(recipients ++ list_memberships, opts)
1110 |> Pagination.fetch_paginated(opts, pagination)
1111 |> Enum.reverse()
1112 |> maybe_update_cc(list_memberships, opts[:user])
1113 end
1114
1115 @doc """
1116 Fetch favorites activities of user with order by sort adds to favorites
1117 """
1118 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1119 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1120 user.ap_id
1121 |> Activity.Queries.by_actor()
1122 |> Activity.Queries.by_type("Like")
1123 |> Activity.with_joined_object()
1124 |> Object.with_joined_activity()
1125 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1126 |> order_by([like, _, _], desc_nulls_last: like.id)
1127 |> Pagination.fetch_paginated(
1128 Map.merge(params, %{skip_order: true}),
1129 pagination
1130 )
1131 end
1132
1133 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1134 Enum.map(activities, fn
1135 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1136 if Enum.any?(bcc, &(&1 in list_memberships)) do
1137 update_in(activity.data["cc"], &[user_ap_id | &1])
1138 else
1139 activity
1140 end
1141
1142 activity ->
1143 activity
1144 end)
1145 end
1146
1147 defp maybe_update_cc(activities, _, _), do: activities
1148
1149 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1150 from(activity in query,
1151 where:
1152 fragment("? && ?", activity.recipients, ^recipients) or
1153 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1154 ^Constants.as_public() in activity.recipients)
1155 )
1156 end
1157
1158 def fetch_activities_bounded(
1159 recipients,
1160 recipients_with_public,
1161 opts \\ %{},
1162 pagination \\ :keyset
1163 ) do
1164 fetch_activities_query([], opts)
1165 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1166 |> Pagination.fetch_paginated(opts, pagination)
1167 |> Enum.reverse()
1168 end
1169
1170 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1171 def upload(file, opts \\ []) do
1172 with {:ok, data} <- Upload.store(file, opts) do
1173 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1174
1175 Repo.insert(%Object{data: obj_data})
1176 end
1177 end
1178
1179 @spec get_actor_url(any()) :: binary() | nil
1180 defp get_actor_url(url) when is_binary(url), do: url
1181 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1182
1183 defp get_actor_url(url) when is_list(url) do
1184 url
1185 |> List.first()
1186 |> get_actor_url()
1187 end
1188
1189 defp get_actor_url(_url), do: nil
1190
1191 defp object_to_user_data(data) do
1192 avatar =
1193 data["icon"]["url"] &&
1194 %{
1195 "type" => "Image",
1196 "url" => [%{"href" => data["icon"]["url"]}]
1197 }
1198
1199 banner =
1200 data["image"]["url"] &&
1201 %{
1202 "type" => "Image",
1203 "url" => [%{"href" => data["image"]["url"]}]
1204 }
1205
1206 fields =
1207 data
1208 |> Map.get("attachment", [])
1209 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1210 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1211
1212 emojis =
1213 data
1214 |> Map.get("tag", [])
1215 |> Enum.filter(fn
1216 %{"type" => "Emoji"} -> true
1217 _ -> false
1218 end)
1219 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1220 {String.trim(name, ":"), url}
1221 end)
1222
1223 locked = data["manuallyApprovesFollowers"] || false
1224 capabilities = data["capabilities"] || %{}
1225 accepts_chat_messages = capabilities["acceptsChatMessages"]
1226 data = Transmogrifier.maybe_fix_user_object(data)
1227 discoverable = data["discoverable"] || false
1228 invisible = data["invisible"] || false
1229 actor_type = data["type"] || "Person"
1230
1231 public_key =
1232 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1233 data["publicKey"]["publicKeyPem"]
1234 else
1235 nil
1236 end
1237
1238 shared_inbox =
1239 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1240 data["endpoints"]["sharedInbox"]
1241 else
1242 nil
1243 end
1244
1245 user_data = %{
1246 ap_id: data["id"],
1247 uri: get_actor_url(data["url"]),
1248 ap_enabled: true,
1249 banner: banner,
1250 fields: fields,
1251 emoji: emojis,
1252 locked: locked,
1253 discoverable: discoverable,
1254 invisible: invisible,
1255 avatar: avatar,
1256 name: data["name"],
1257 follower_address: data["followers"],
1258 following_address: data["following"],
1259 bio: data["summary"],
1260 actor_type: actor_type,
1261 also_known_as: Map.get(data, "alsoKnownAs", []),
1262 public_key: public_key,
1263 inbox: data["inbox"],
1264 shared_inbox: shared_inbox,
1265 accepts_chat_messages: accepts_chat_messages
1266 }
1267
1268 # nickname can be nil because of virtual actors
1269 if data["preferredUsername"] do
1270 Map.put(
1271 user_data,
1272 :nickname,
1273 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1274 )
1275 else
1276 Map.put(user_data, :nickname, nil)
1277 end
1278 end
1279
1280 def fetch_follow_information_for_user(user) do
1281 with {:ok, following_data} <-
1282 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1283 {:ok, hide_follows} <- collection_private(following_data),
1284 {:ok, followers_data} <-
1285 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1286 {:ok, hide_followers} <- collection_private(followers_data) do
1287 {:ok,
1288 %{
1289 hide_follows: hide_follows,
1290 follower_count: normalize_counter(followers_data["totalItems"]),
1291 following_count: normalize_counter(following_data["totalItems"]),
1292 hide_followers: hide_followers
1293 }}
1294 else
1295 {:error, _} = e -> e
1296 e -> {:error, e}
1297 end
1298 end
1299
1300 defp normalize_counter(counter) when is_integer(counter), do: counter
1301 defp normalize_counter(_), do: 0
1302
1303 def maybe_update_follow_information(user_data) do
1304 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1305 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1306 {_, true} <-
1307 {:collections_available,
1308 !!(user_data[:following_address] && user_data[:follower_address])},
1309 {:ok, info} <-
1310 fetch_follow_information_for_user(user_data) do
1311 info = Map.merge(user_data[:info] || %{}, info)
1312
1313 user_data
1314 |> Map.put(:info, info)
1315 else
1316 {:user_type_check, false} ->
1317 user_data
1318
1319 {:collections_available, false} ->
1320 user_data
1321
1322 {:enabled, false} ->
1323 user_data
1324
1325 e ->
1326 Logger.error(
1327 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1328 )
1329
1330 user_data
1331 end
1332 end
1333
1334 defp collection_private(%{"first" => %{"type" => type}})
1335 when type in ["CollectionPage", "OrderedCollectionPage"],
1336 do: {:ok, false}
1337
1338 defp collection_private(%{"first" => first}) do
1339 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1340 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1341 {:ok, false}
1342 else
1343 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1344 {:error, _} = e -> e
1345 e -> {:error, e}
1346 end
1347 end
1348
1349 defp collection_private(_data), do: {:ok, true}
1350
1351 def user_data_from_user_object(data) do
1352 with {:ok, data} <- MRF.filter(data) do
1353 {:ok, object_to_user_data(data)}
1354 else
1355 e -> {:error, e}
1356 end
1357 end
1358
1359 def fetch_and_prepare_user_from_ap_id(ap_id) do
1360 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1361 {:ok, data} <- user_data_from_user_object(data) do
1362 {:ok, maybe_update_follow_information(data)}
1363 else
1364 {:error, "Object has been deleted" = e} ->
1365 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1366 {:error, e}
1367
1368 {:error, e} ->
1369 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1370 {:error, e}
1371 end
1372 end
1373
1374 def maybe_handle_clashing_nickname(data) do
1375 nickname = data[:nickname]
1376
1377 with %User{} = old_user <- User.get_by_nickname(nickname),
1378 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1379 Logger.info(
1380 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1381 data[:ap_id]
1382 }, renaming."
1383 )
1384
1385 old_user
1386 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1387 |> User.update_and_set_cache()
1388 else
1389 {:ap_id_comparison, true} ->
1390 Logger.info(
1391 "Found an old user for #{nickname}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1392 )
1393
1394 _ ->
1395 nil
1396 end
1397 end
1398
1399 def make_user_from_ap_id(ap_id) do
1400 user = User.get_cached_by_ap_id(ap_id)
1401
1402 if user && !User.ap_enabled?(user) do
1403 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1404 else
1405 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1406 if user do
1407 user
1408 |> User.remote_user_changeset(data)
1409 |> User.update_and_set_cache()
1410 else
1411 maybe_handle_clashing_nickname(data)
1412
1413 data
1414 |> User.remote_user_changeset()
1415 |> Repo.insert()
1416 |> User.set_cache()
1417 end
1418 end
1419 end
1420 end
1421
1422 def make_user_from_nickname(nickname) do
1423 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1424 make_user_from_ap_id(ap_id)
1425 else
1426 _e -> {:error, "No AP id in WebFinger"}
1427 end
1428 end
1429
1430 # filter out broken threads
1431 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1432 entire_thread_visible_for_user?(activity, user)
1433 end
1434
1435 # do post-processing on a specific activity
1436 def contain_activity(%Activity{} = activity, %User{} = user) do
1437 contain_broken_threads(activity, user)
1438 end
1439
1440 def fetch_direct_messages_query do
1441 Activity
1442 |> restrict_type(%{type: "Create"})
1443 |> restrict_visibility(%{visibility: "direct"})
1444 |> order_by([activity], asc: activity.id)
1445 end
1446 end