c33848277d8b8577e1a08e8befb437808b5bc03b
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 @object_types ~w[ChatMessage Question Answer Audio Event]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
91 {:ok, object, meta}
92 end
93 end
94
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
98 {:ok, activity} <-
99 Repo.insert(%Activity{
100 data: object,
101 local: local,
102 recipients: recipients,
103 actor: object["actor"]
104 }) do
105 {:ok, activity, meta}
106 end
107 end
108
109 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
110 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
111 with nil <- Activity.normalize(map),
112 map <- lazy_put_activity_defaults(map, fake),
113 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
114 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
115 {:ok, map} <- MRF.filter(map),
116 {recipients, _, _} = get_recipients(map),
117 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
118 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
119 {:ok, map, object} <- insert_full_object(map) do
120 {:ok, activity} =
121 %Activity{
122 data: map,
123 local: local,
124 actor: map["actor"],
125 recipients: recipients
126 }
127 |> Repo.insert()
128 |> maybe_create_activity_expiration()
129
130 # Splice in the child object if we have one.
131 activity = Maps.put_if_present(activity, :object, object)
132
133 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
134
135 {:ok, activity}
136 else
137 %Activity{} = activity ->
138 {:ok, activity}
139
140 {:fake, true, map, recipients} ->
141 activity = %Activity{
142 data: map,
143 local: local,
144 actor: map["actor"],
145 recipients: recipients,
146 id: "pleroma:fakeid"
147 }
148
149 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
150 {:ok, activity}
151
152 error ->
153 {:error, error}
154 end
155 end
156
157 def notify_and_stream(activity) do
158 Notification.create_notifications(activity)
159
160 conversation = create_or_bump_conversation(activity, activity.actor)
161 participations = get_participations(conversation)
162 stream_out(activity)
163 stream_out_participations(participations)
164 end
165
166 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
167 with {:ok, _job} <-
168 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
169 activity_id: activity.id,
170 expires_at: expires_at
171 }) do
172 {:ok, activity}
173 end
174 end
175
176 defp maybe_create_activity_expiration(result), do: result
177
178 defp create_or_bump_conversation(activity, actor) do
179 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
180 %User{} = user <- User.get_cached_by_ap_id(actor) do
181 Participation.mark_as_read(user, conversation)
182 {:ok, conversation}
183 end
184 end
185
186 defp get_participations({:ok, conversation}) do
187 conversation
188 |> Repo.preload(:participations, force: true)
189 |> Map.get(:participations)
190 end
191
192 defp get_participations(_), do: []
193
194 def stream_out_participations(participations) do
195 participations =
196 participations
197 |> Repo.preload(:user)
198
199 Streamer.stream("participation", participations)
200 end
201
202 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
203 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
204 conversation = Repo.preload(conversation, :participations)
205
206 last_activity_id =
207 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
208 user: user,
209 blocking_user: user
210 })
211
212 if last_activity_id do
213 stream_out_participations(conversation.participations)
214 end
215 end
216 end
217
218 def stream_out_participations(_, _), do: :noop
219
220 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
221 when data_type in ["Create", "Announce", "Delete"] do
222 activity
223 |> Topics.get_activity_topics()
224 |> Streamer.stream(activity)
225 end
226
227 def stream_out(_activity) do
228 :noop
229 end
230
231 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
232 def create(params, fake \\ false) do
233 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
234 result
235 end
236 end
237
238 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
239 additional = params[:additional] || %{}
240 # only accept false as false value
241 local = !(params[:local] == false)
242 published = params[:published]
243 quick_insert? = Config.get([:env]) == :benchmark
244
245 create_data =
246 make_create_data(
247 %{to: to, actor: actor, published: published, context: context, object: object},
248 additional
249 )
250
251 with {:ok, activity} <- insert(create_data, local, fake),
252 {:fake, false, activity} <- {:fake, fake, activity},
253 _ <- increase_replies_count_if_reply(create_data),
254 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
255 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
256 _ <- notify_and_stream(activity),
257 :ok <- maybe_federate(activity) do
258 {:ok, activity}
259 else
260 {:quick_insert, true, activity} ->
261 {:ok, activity}
262
263 {:fake, true, activity} ->
264 {:ok, activity}
265
266 {:error, message} ->
267 Repo.rollback(message)
268 end
269 end
270
271 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
272 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
273 additional = params[:additional] || %{}
274 # only accept false as false value
275 local = !(params[:local] == false)
276 published = params[:published]
277
278 listen_data =
279 make_listen_data(
280 %{to: to, actor: actor, published: published, context: context, object: object},
281 additional
282 )
283
284 with {:ok, activity} <- insert(listen_data, local),
285 _ <- notify_and_stream(activity),
286 :ok <- maybe_federate(activity) do
287 {:ok, activity}
288 end
289 end
290
291 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
292 {:ok, Activity.t()} | nil | {:error, any()}
293 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
294 with {:ok, result} <-
295 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
296 result
297 end
298 end
299
300 defp do_unfollow(follower, followed, activity_id, local) do
301 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
302 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
303 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
304 {:ok, activity} <- insert(unfollow_data, local),
305 _ <- notify_and_stream(activity),
306 :ok <- maybe_federate(activity) do
307 {:ok, activity}
308 else
309 nil -> nil
310 {:error, error} -> Repo.rollback(error)
311 end
312 end
313
314 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
315 def flag(
316 %{
317 actor: actor,
318 context: _context,
319 account: account,
320 statuses: statuses,
321 content: content
322 } = params
323 ) do
324 # only accept false as false value
325 local = !(params[:local] == false)
326 forward = !(params[:forward] == false)
327
328 additional = params[:additional] || %{}
329
330 additional =
331 if forward do
332 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
333 else
334 Map.merge(additional, %{"to" => [], "cc" => []})
335 end
336
337 with flag_data <- make_flag_data(params, additional),
338 {:ok, activity} <- insert(flag_data, local),
339 {:ok, stripped_activity} <- strip_report_status_data(activity),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(stripped_activity) do
342 User.all_superusers()
343 |> Enum.filter(fn user -> not is_nil(user.email) end)
344 |> Enum.each(fn superuser ->
345 superuser
346 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
347 |> Pleroma.Emails.Mailer.deliver_async()
348 end)
349
350 {:ok, activity}
351 end
352 end
353
354 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
355 def move(%User{} = origin, %User{} = target, local \\ true) do
356 params = %{
357 "type" => "Move",
358 "actor" => origin.ap_id,
359 "object" => origin.ap_id,
360 "target" => target.ap_id
361 }
362
363 with true <- origin.ap_id in target.also_known_as,
364 {:ok, activity} <- insert(params, local),
365 _ <- notify_and_stream(activity) do
366 maybe_federate(activity)
367
368 BackgroundWorker.enqueue("move_following", %{
369 "origin_id" => origin.id,
370 "target_id" => target.id
371 })
372
373 {:ok, activity}
374 else
375 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
376 err -> err
377 end
378 end
379
380 def fetch_activities_for_context_query(context, opts) do
381 public = [Constants.as_public()]
382
383 recipients =
384 if opts[:user],
385 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
386 else: public
387
388 from(activity in Activity)
389 |> maybe_preload_objects(opts)
390 |> maybe_preload_bookmarks(opts)
391 |> maybe_set_thread_muted_field(opts)
392 |> restrict_blocked(opts)
393 |> restrict_recipients(recipients, opts[:user])
394 |> restrict_filtered(opts)
395 |> where(
396 [activity],
397 fragment(
398 "?->>'type' = ? and ?->>'context' = ?",
399 activity.data,
400 "Create",
401 activity.data,
402 ^context
403 )
404 )
405 |> exclude_poll_votes(opts)
406 |> exclude_id(opts)
407 |> order_by([activity], desc: activity.id)
408 end
409
410 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
411 def fetch_activities_for_context(context, opts \\ %{}) do
412 context
413 |> fetch_activities_for_context_query(opts)
414 |> Repo.all()
415 end
416
417 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
418 FlakeId.Ecto.CompatType.t() | nil
419 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
420 context
421 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
422 |> restrict_visibility(%{visibility: "direct"})
423 |> limit(1)
424 |> select([a], a.id)
425 |> Repo.one()
426 end
427
428 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
429 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
430 opts = Map.delete(opts, :user)
431
432 [Constants.as_public()]
433 |> fetch_activities_query(opts)
434 |> restrict_unlisted(opts)
435 |> Pagination.fetch_paginated(opts, pagination)
436 end
437
438 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
439 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
440 opts
441 |> Map.put(:restrict_unlisted, true)
442 |> fetch_public_or_unlisted_activities(pagination)
443 end
444
445 @valid_visibilities ~w[direct unlisted public private]
446
447 defp restrict_visibility(query, %{visibility: visibility})
448 when is_list(visibility) do
449 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
450 from(
451 a in query,
452 where:
453 fragment(
454 "activity_visibility(?, ?, ?) = ANY (?)",
455 a.actor,
456 a.recipients,
457 a.data,
458 ^visibility
459 )
460 )
461 else
462 Logger.error("Could not restrict visibility to #{visibility}")
463 end
464 end
465
466 defp restrict_visibility(query, %{visibility: visibility})
467 when visibility in @valid_visibilities do
468 from(
469 a in query,
470 where:
471 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
472 )
473 end
474
475 defp restrict_visibility(_query, %{visibility: visibility})
476 when visibility not in @valid_visibilities do
477 Logger.error("Could not restrict visibility to #{visibility}")
478 end
479
480 defp restrict_visibility(query, _visibility), do: query
481
482 defp exclude_visibility(query, %{exclude_visibilities: visibility})
483 when is_list(visibility) do
484 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
485 from(
486 a in query,
487 where:
488 not fragment(
489 "activity_visibility(?, ?, ?) = ANY (?)",
490 a.actor,
491 a.recipients,
492 a.data,
493 ^visibility
494 )
495 )
496 else
497 Logger.error("Could not exclude visibility to #{visibility}")
498 query
499 end
500 end
501
502 defp exclude_visibility(query, %{exclude_visibilities: visibility})
503 when visibility in @valid_visibilities do
504 from(
505 a in query,
506 where:
507 not fragment(
508 "activity_visibility(?, ?, ?) = ?",
509 a.actor,
510 a.recipients,
511 a.data,
512 ^visibility
513 )
514 )
515 end
516
517 defp exclude_visibility(query, %{exclude_visibilities: visibility})
518 when visibility not in [nil | @valid_visibilities] do
519 Logger.error("Could not exclude visibility to #{visibility}")
520 query
521 end
522
523 defp exclude_visibility(query, _visibility), do: query
524
525 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
526 do: query
527
528 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
529 do: query
530
531 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
532 from(
533 a in query,
534 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
535 )
536 end
537
538 defp restrict_thread_visibility(query, _, _), do: query
539
540 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
541 params =
542 params
543 |> Map.put(:user, reading_user)
544 |> Map.put(:actor_id, user.ap_id)
545
546 %{
547 godmode: params[:godmode],
548 reading_user: reading_user
549 }
550 |> user_activities_recipients()
551 |> fetch_activities(params)
552 |> Enum.reverse()
553 end
554
555 def fetch_user_activities(user, reading_user, params \\ %{}) do
556 params =
557 params
558 |> Map.put(:type, ["Create", "Announce"])
559 |> Map.put(:user, reading_user)
560 |> Map.put(:actor_id, user.ap_id)
561 |> Map.put(:pinned_activity_ids, user.pinned_activities)
562
563 params =
564 if User.blocks?(reading_user, user) do
565 params
566 else
567 params
568 |> Map.put(:blocking_user, reading_user)
569 |> Map.put(:muting_user, reading_user)
570 end
571
572 %{
573 godmode: params[:godmode],
574 reading_user: reading_user
575 }
576 |> user_activities_recipients()
577 |> fetch_activities(params)
578 |> Enum.reverse()
579 end
580
581 def fetch_statuses(reading_user, params) do
582 params = Map.put(params, :type, ["Create", "Announce"])
583
584 %{
585 godmode: params[:godmode],
586 reading_user: reading_user
587 }
588 |> user_activities_recipients()
589 |> fetch_activities(params, :offset)
590 |> Enum.reverse()
591 end
592
593 defp user_activities_recipients(%{godmode: true}), do: []
594
595 defp user_activities_recipients(%{reading_user: reading_user}) do
596 if reading_user do
597 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
598 else
599 [Constants.as_public()]
600 end
601 end
602
603 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
604 raise "Can't use the child object without preloading!"
605 end
606
607 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
608 from(
609 [activity, object] in query,
610 where:
611 fragment(
612 "?->>'type' != ? or ?->>'actor' != ?",
613 activity.data,
614 "Announce",
615 object.data,
616 ^actor
617 )
618 )
619 end
620
621 defp restrict_announce_object_actor(query, _), do: query
622
623 defp restrict_since(query, %{since_id: ""}), do: query
624
625 defp restrict_since(query, %{since_id: since_id}) do
626 from(activity in query, where: activity.id > ^since_id)
627 end
628
629 defp restrict_since(query, _), do: query
630
631 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
632 raise "Can't use the child object without preloading!"
633 end
634
635 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
636 from(
637 [_activity, object] in query,
638 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
639 )
640 end
641
642 defp restrict_tag_reject(query, _), do: query
643
644 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
645 raise "Can't use the child object without preloading!"
646 end
647
648 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
649 from(
650 [_activity, object] in query,
651 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
652 )
653 end
654
655 defp restrict_tag_all(query, _), do: query
656
657 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
658 raise "Can't use the child object without preloading!"
659 end
660
661 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
662 from(
663 [_activity, object] in query,
664 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
665 )
666 end
667
668 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
669 from(
670 [_activity, object] in query,
671 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
672 )
673 end
674
675 defp restrict_tag(query, _), do: query
676
677 defp restrict_recipients(query, [], _user), do: query
678
679 defp restrict_recipients(query, recipients, nil) do
680 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
681 end
682
683 defp restrict_recipients(query, recipients, user) do
684 from(
685 activity in query,
686 where: fragment("? && ?", ^recipients, activity.recipients),
687 or_where: activity.actor == ^user.ap_id
688 )
689 end
690
691 defp restrict_local(query, %{local_only: true}) do
692 from(activity in query, where: activity.local == true)
693 end
694
695 defp restrict_local(query, _), do: query
696
697 defp restrict_actor(query, %{actor_id: actor_id}) do
698 from(activity in query, where: activity.actor == ^actor_id)
699 end
700
701 defp restrict_actor(query, _), do: query
702
703 defp restrict_type(query, %{type: type}) when is_binary(type) do
704 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
705 end
706
707 defp restrict_type(query, %{type: type}) do
708 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
709 end
710
711 defp restrict_type(query, _), do: query
712
713 defp restrict_state(query, %{state: state}) do
714 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
715 end
716
717 defp restrict_state(query, _), do: query
718
719 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
720 from(
721 [_activity, object] in query,
722 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
723 )
724 end
725
726 defp restrict_favorited_by(query, _), do: query
727
728 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
729 raise "Can't use the child object without preloading!"
730 end
731
732 defp restrict_media(query, %{only_media: true}) do
733 from(
734 [activity, object] in query,
735 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
736 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
737 )
738 end
739
740 defp restrict_media(query, _), do: query
741
742 defp restrict_replies(query, %{exclude_replies: true}) do
743 from(
744 [_activity, object] in query,
745 where: fragment("?->>'inReplyTo' is null", object.data)
746 )
747 end
748
749 defp restrict_replies(query, %{
750 reply_filtering_user: user,
751 reply_visibility: "self"
752 }) do
753 from(
754 [activity, object] in query,
755 where:
756 fragment(
757 "?->>'inReplyTo' is null OR ? = ANY(?)",
758 object.data,
759 ^user.ap_id,
760 activity.recipients
761 )
762 )
763 end
764
765 defp restrict_replies(query, %{
766 reply_filtering_user: user,
767 reply_visibility: "following"
768 }) do
769 from(
770 [activity, object] in query,
771 where:
772 fragment(
773 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
774 object.data,
775 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
776 activity.recipients,
777 activity.actor,
778 activity.actor,
779 ^user.ap_id
780 )
781 )
782 end
783
784 defp restrict_replies(query, _), do: query
785
786 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
787 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
788 end
789
790 defp restrict_reblogs(query, _), do: query
791
792 defp restrict_muted(query, %{with_muted: true}), do: query
793
794 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
795 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
796
797 query =
798 from([activity] in query,
799 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
800 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
801 )
802
803 unless opts[:skip_preload] do
804 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
805 else
806 query
807 end
808 end
809
810 defp restrict_muted(query, _), do: query
811
812 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
813 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
814 domain_blocks = user.domain_blocks || []
815
816 following_ap_ids = User.get_friends_ap_ids(user)
817
818 query =
819 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
820
821 from(
822 [activity, object: o] in query,
823 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
824 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
825 where:
826 fragment(
827 "recipients_contain_blocked_domains(?, ?) = false",
828 activity.recipients,
829 ^domain_blocks
830 ),
831 where:
832 fragment(
833 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
834 activity.data,
835 activity.data,
836 ^blocked_ap_ids
837 ),
838 where:
839 fragment(
840 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
841 activity.actor,
842 ^domain_blocks,
843 activity.actor,
844 ^following_ap_ids
845 ),
846 where:
847 fragment(
848 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
849 o.data,
850 ^domain_blocks,
851 o.data,
852 ^following_ap_ids
853 )
854 )
855 end
856
857 defp restrict_blocked(query, _), do: query
858
859 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
860 from(
861 activity in query,
862 where:
863 fragment(
864 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
865 activity.data,
866 ^[Constants.as_public()]
867 )
868 )
869 end
870
871 defp restrict_unlisted(query, _), do: query
872
873 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
874 from(activity in query, where: activity.id in ^ids)
875 end
876
877 defp restrict_pinned(query, _), do: query
878
879 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
880 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
881
882 from(
883 activity in query,
884 where:
885 fragment(
886 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
887 activity.data,
888 activity.actor,
889 ^muted_reblogs
890 )
891 )
892 end
893
894 defp restrict_muted_reblogs(query, _), do: query
895
896 defp restrict_instance(query, %{instance: instance}) do
897 users =
898 from(
899 u in User,
900 select: u.ap_id,
901 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
902 )
903 |> Repo.all()
904
905 from(activity in query, where: activity.actor in ^users)
906 end
907
908 defp restrict_instance(query, _), do: query
909
910 defp restrict_filtered(query, %{user: %User{} = user}) do
911 case Filter.compose_regex(user) do
912 nil ->
913 query
914
915 regex ->
916 from([activity, object] in query,
917 where:
918 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
919 activity.actor == ^user.ap_id
920 )
921 end
922 end
923
924 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
925 restrict_filtered(query, %{user: user})
926 end
927
928 defp restrict_filtered(query, _), do: query
929
930 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
931
932 defp exclude_poll_votes(query, _) do
933 if has_named_binding?(query, :object) do
934 from([activity, object: o] in query,
935 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
936 )
937 else
938 query
939 end
940 end
941
942 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
943
944 defp exclude_chat_messages(query, _) do
945 if has_named_binding?(query, :object) do
946 from([activity, object: o] in query,
947 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
948 )
949 else
950 query
951 end
952 end
953
954 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
955
956 defp exclude_invisible_actors(query, _opts) do
957 invisible_ap_ids =
958 User.Query.build(%{invisible: true, select: [:ap_id]})
959 |> Repo.all()
960 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
961
962 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
963 end
964
965 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
966 from(activity in query, where: activity.id != ^id)
967 end
968
969 defp exclude_id(query, _), do: query
970
971 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
972
973 defp maybe_preload_objects(query, _) do
974 query
975 |> Activity.with_preloaded_object()
976 end
977
978 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
979
980 defp maybe_preload_bookmarks(query, opts) do
981 query
982 |> Activity.with_preloaded_bookmark(opts[:user])
983 end
984
985 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
986 query
987 |> Activity.with_preloaded_report_notes()
988 end
989
990 defp maybe_preload_report_notes(query, _), do: query
991
992 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
993
994 defp maybe_set_thread_muted_field(query, opts) do
995 query
996 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
997 end
998
999 defp maybe_order(query, %{order: :desc}) do
1000 query
1001 |> order_by(desc: :id)
1002 end
1003
1004 defp maybe_order(query, %{order: :asc}) do
1005 query
1006 |> order_by(asc: :id)
1007 end
1008
1009 defp maybe_order(query, _), do: query
1010
1011 defp fetch_activities_query_ap_ids_ops(opts) do
1012 source_user = opts[:muting_user]
1013 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1014
1015 ap_id_relationships =
1016 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1017 [:block | ap_id_relationships]
1018 else
1019 ap_id_relationships
1020 end
1021
1022 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1023
1024 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1025 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1026
1027 restrict_muted_reblogs_opts =
1028 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1029
1030 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1031 end
1032
1033 def fetch_activities_query(recipients, opts \\ %{}) do
1034 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1035 fetch_activities_query_ap_ids_ops(opts)
1036
1037 config = %{
1038 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1039 }
1040
1041 Activity
1042 |> maybe_preload_objects(opts)
1043 |> maybe_preload_bookmarks(opts)
1044 |> maybe_preload_report_notes(opts)
1045 |> maybe_set_thread_muted_field(opts)
1046 |> maybe_order(opts)
1047 |> restrict_recipients(recipients, opts[:user])
1048 |> restrict_replies(opts)
1049 |> restrict_tag(opts)
1050 |> restrict_tag_reject(opts)
1051 |> restrict_tag_all(opts)
1052 |> restrict_since(opts)
1053 |> restrict_local(opts)
1054 |> restrict_actor(opts)
1055 |> restrict_type(opts)
1056 |> restrict_state(opts)
1057 |> restrict_favorited_by(opts)
1058 |> restrict_blocked(restrict_blocked_opts)
1059 |> restrict_muted(restrict_muted_opts)
1060 |> restrict_filtered(opts)
1061 |> restrict_media(opts)
1062 |> restrict_visibility(opts)
1063 |> restrict_thread_visibility(opts, config)
1064 |> restrict_reblogs(opts)
1065 |> restrict_pinned(opts)
1066 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1067 |> restrict_instance(opts)
1068 |> restrict_announce_object_actor(opts)
1069 |> restrict_filtered(opts)
1070 |> Activity.restrict_deactivated_users()
1071 |> exclude_poll_votes(opts)
1072 |> exclude_chat_messages(opts)
1073 |> exclude_invisible_actors(opts)
1074 |> exclude_visibility(opts)
1075 end
1076
1077 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1078 list_memberships = Pleroma.List.memberships(opts[:user])
1079
1080 fetch_activities_query(recipients ++ list_memberships, opts)
1081 |> Pagination.fetch_paginated(opts, pagination)
1082 |> Enum.reverse()
1083 |> maybe_update_cc(list_memberships, opts[:user])
1084 end
1085
1086 @doc """
1087 Fetch favorites activities of user with order by sort adds to favorites
1088 """
1089 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1090 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1091 user.ap_id
1092 |> Activity.Queries.by_actor()
1093 |> Activity.Queries.by_type("Like")
1094 |> Activity.with_joined_object()
1095 |> Object.with_joined_activity()
1096 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1097 |> order_by([like, _, _], desc_nulls_last: like.id)
1098 |> Pagination.fetch_paginated(
1099 Map.merge(params, %{skip_order: true}),
1100 pagination
1101 )
1102 end
1103
1104 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1105 Enum.map(activities, fn
1106 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1107 if Enum.any?(bcc, &(&1 in list_memberships)) do
1108 update_in(activity.data["cc"], &[user_ap_id | &1])
1109 else
1110 activity
1111 end
1112
1113 activity ->
1114 activity
1115 end)
1116 end
1117
1118 defp maybe_update_cc(activities, _, _), do: activities
1119
1120 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1121 from(activity in query,
1122 where:
1123 fragment("? && ?", activity.recipients, ^recipients) or
1124 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1125 ^Constants.as_public() in activity.recipients)
1126 )
1127 end
1128
1129 def fetch_activities_bounded(
1130 recipients,
1131 recipients_with_public,
1132 opts \\ %{},
1133 pagination \\ :keyset
1134 ) do
1135 fetch_activities_query([], opts)
1136 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1137 |> Pagination.fetch_paginated(opts, pagination)
1138 |> Enum.reverse()
1139 end
1140
1141 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1142 def upload(file, opts \\ []) do
1143 with {:ok, data} <- Upload.store(file, opts) do
1144 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1145
1146 Repo.insert(%Object{data: obj_data})
1147 end
1148 end
1149
1150 @spec get_actor_url(any()) :: binary() | nil
1151 defp get_actor_url(url) when is_binary(url), do: url
1152 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1153
1154 defp get_actor_url(url) when is_list(url) do
1155 url
1156 |> List.first()
1157 |> get_actor_url()
1158 end
1159
1160 defp get_actor_url(_url), do: nil
1161
1162 defp object_to_user_data(data) do
1163 avatar =
1164 data["icon"]["url"] &&
1165 %{
1166 "type" => "Image",
1167 "url" => [%{"href" => data["icon"]["url"]}]
1168 }
1169
1170 banner =
1171 data["image"]["url"] &&
1172 %{
1173 "type" => "Image",
1174 "url" => [%{"href" => data["image"]["url"]}]
1175 }
1176
1177 fields =
1178 data
1179 |> Map.get("attachment", [])
1180 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1181 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1182
1183 emojis =
1184 data
1185 |> Map.get("tag", [])
1186 |> Enum.filter(fn
1187 %{"type" => "Emoji"} -> true
1188 _ -> false
1189 end)
1190 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1191 {String.trim(name, ":"), url}
1192 end)
1193
1194 locked = data["manuallyApprovesFollowers"] || false
1195 capabilities = data["capabilities"] || %{}
1196 accepts_chat_messages = capabilities["acceptsChatMessages"]
1197 data = Transmogrifier.maybe_fix_user_object(data)
1198 discoverable = data["discoverable"] || false
1199 invisible = data["invisible"] || false
1200 actor_type = data["type"] || "Person"
1201
1202 public_key =
1203 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1204 data["publicKey"]["publicKeyPem"]
1205 else
1206 nil
1207 end
1208
1209 shared_inbox =
1210 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1211 data["endpoints"]["sharedInbox"]
1212 else
1213 nil
1214 end
1215
1216 user_data = %{
1217 ap_id: data["id"],
1218 uri: get_actor_url(data["url"]),
1219 ap_enabled: true,
1220 banner: banner,
1221 fields: fields,
1222 emoji: emojis,
1223 locked: locked,
1224 discoverable: discoverable,
1225 invisible: invisible,
1226 avatar: avatar,
1227 name: data["name"],
1228 follower_address: data["followers"],
1229 following_address: data["following"],
1230 bio: data["summary"] || "",
1231 actor_type: actor_type,
1232 also_known_as: Map.get(data, "alsoKnownAs", []),
1233 public_key: public_key,
1234 inbox: data["inbox"],
1235 shared_inbox: shared_inbox,
1236 accepts_chat_messages: accepts_chat_messages
1237 }
1238
1239 # nickname can be nil because of virtual actors
1240 if data["preferredUsername"] do
1241 Map.put(
1242 user_data,
1243 :nickname,
1244 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1245 )
1246 else
1247 Map.put(user_data, :nickname, nil)
1248 end
1249 end
1250
1251 def fetch_follow_information_for_user(user) do
1252 with {:ok, following_data} <-
1253 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1254 {:ok, hide_follows} <- collection_private(following_data),
1255 {:ok, followers_data} <-
1256 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1257 {:ok, hide_followers} <- collection_private(followers_data) do
1258 {:ok,
1259 %{
1260 hide_follows: hide_follows,
1261 follower_count: normalize_counter(followers_data["totalItems"]),
1262 following_count: normalize_counter(following_data["totalItems"]),
1263 hide_followers: hide_followers
1264 }}
1265 else
1266 {:error, _} = e -> e
1267 e -> {:error, e}
1268 end
1269 end
1270
1271 defp normalize_counter(counter) when is_integer(counter), do: counter
1272 defp normalize_counter(_), do: 0
1273
1274 def maybe_update_follow_information(user_data) do
1275 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1276 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1277 {_, true} <-
1278 {:collections_available,
1279 !!(user_data[:following_address] && user_data[:follower_address])},
1280 {:ok, info} <-
1281 fetch_follow_information_for_user(user_data) do
1282 info = Map.merge(user_data[:info] || %{}, info)
1283
1284 user_data
1285 |> Map.put(:info, info)
1286 else
1287 {:user_type_check, false} ->
1288 user_data
1289
1290 {:collections_available, false} ->
1291 user_data
1292
1293 {:enabled, false} ->
1294 user_data
1295
1296 e ->
1297 Logger.error(
1298 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1299 )
1300
1301 user_data
1302 end
1303 end
1304
1305 defp collection_private(%{"first" => %{"type" => type}})
1306 when type in ["CollectionPage", "OrderedCollectionPage"],
1307 do: {:ok, false}
1308
1309 defp collection_private(%{"first" => first}) do
1310 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1311 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1312 {:ok, false}
1313 else
1314 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1315 {:error, _} = e -> e
1316 e -> {:error, e}
1317 end
1318 end
1319
1320 defp collection_private(_data), do: {:ok, true}
1321
1322 def user_data_from_user_object(data) do
1323 with {:ok, data} <- MRF.filter(data) do
1324 {:ok, object_to_user_data(data)}
1325 else
1326 e -> {:error, e}
1327 end
1328 end
1329
1330 def fetch_and_prepare_user_from_ap_id(ap_id) do
1331 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1332 {:ok, data} <- user_data_from_user_object(data) do
1333 {:ok, maybe_update_follow_information(data)}
1334 else
1335 {:error, "Object has been deleted" = e} ->
1336 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1337 {:error, e}
1338
1339 {:error, {:reject, reason} = e} ->
1340 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1341 {:error, e}
1342
1343 {:error, e} ->
1344 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1345 {:error, e}
1346 end
1347 end
1348
1349 def maybe_handle_clashing_nickname(data) do
1350 with nickname when is_binary(nickname) <- data[:nickname],
1351 %User{} = old_user <- User.get_by_nickname(nickname),
1352 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1353 Logger.info(
1354 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1355 data[:ap_id]
1356 }, renaming."
1357 )
1358
1359 old_user
1360 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1361 |> User.update_and_set_cache()
1362 else
1363 {:ap_id_comparison, true} ->
1364 Logger.info(
1365 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1366 )
1367
1368 _ ->
1369 nil
1370 end
1371 end
1372
1373 def make_user_from_ap_id(ap_id) do
1374 user = User.get_cached_by_ap_id(ap_id)
1375
1376 if user && !User.ap_enabled?(user) do
1377 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1378 else
1379 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1380 if user do
1381 user
1382 |> User.remote_user_changeset(data)
1383 |> User.update_and_set_cache()
1384 else
1385 maybe_handle_clashing_nickname(data)
1386
1387 data
1388 |> User.remote_user_changeset()
1389 |> Repo.insert()
1390 |> User.set_cache()
1391 end
1392 end
1393 end
1394 end
1395
1396 def make_user_from_nickname(nickname) do
1397 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1398 make_user_from_ap_id(ap_id)
1399 else
1400 _e -> {:error, "No AP id in WebFinger"}
1401 end
1402 end
1403
1404 # filter out broken threads
1405 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1406 entire_thread_visible_for_user?(activity, user)
1407 end
1408
1409 # do post-processing on a specific activity
1410 def contain_activity(%Activity{} = activity, %User{} = user) do
1411 contain_broken_threads(activity, user)
1412 end
1413
1414 def fetch_direct_messages_query do
1415 Activity
1416 |> restrict_type(%{type: "Create"})
1417 |> restrict_visibility(%{visibility: "direct"})
1418 |> order_by([activity], asc: activity.id)
1419 end
1420 end