Merge remote-tracking branch 'pleroma/develop' into features/poll-validation
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
42 {recipients, to, cc}
43 end
44
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
50 {recipients, to, cc}
51 end
52
53 defp check_actor_is_active(nil), do: true
54
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
58 _ -> false
59 end
60 end
61
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
65 end
66
67 defp check_remote_limit(_), do: true
68
69 def increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
71 end
72
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
75 end
76
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
79 "type" => "Create"
80 }) do
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
83 end
84 end
85
86 defp increase_replies_count_if_reply(_create_data), do: :noop
87
88 @object_types ["ChatMessage", "Question", "Answer"]
89 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
90 def persist(%{"type" => type} = object, meta) when type in @object_types do
91 with {:ok, object} <- Object.create(object) do
92 {:ok, object, meta}
93 end
94 end
95
96 def persist(object, meta) do
97 with local <- Keyword.fetch!(meta, :local),
98 {recipients, _, _} <- get_recipients(object),
99 {:ok, activity} <-
100 Repo.insert(%Activity{
101 data: object,
102 local: local,
103 recipients: recipients,
104 actor: object["actor"]
105 }) do
106 {:ok, activity, meta}
107 end
108 end
109
110 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
111 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
112 with nil <- Activity.normalize(map),
113 map <- lazy_put_activity_defaults(map, fake),
114 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
115 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
116 {:ok, map} <- MRF.filter(map),
117 {recipients, _, _} = get_recipients(map),
118 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
119 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
120 {:ok, map, object} <- insert_full_object(map) do
121 {:ok, activity} =
122 %Activity{
123 data: map,
124 local: local,
125 actor: map["actor"],
126 recipients: recipients
127 }
128 |> Repo.insert()
129 |> maybe_create_activity_expiration()
130
131 # Splice in the child object if we have one.
132 activity = Maps.put_if_present(activity, :object, object)
133
134 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
135
136 {:ok, activity}
137 else
138 %Activity{} = activity ->
139 {:ok, activity}
140
141 {:fake, true, map, recipients} ->
142 activity = %Activity{
143 data: map,
144 local: local,
145 actor: map["actor"],
146 recipients: recipients,
147 id: "pleroma:fakeid"
148 }
149
150 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
151 {:ok, activity}
152
153 error ->
154 {:error, error}
155 end
156 end
157
158 def notify_and_stream(activity) do
159 Notification.create_notifications(activity)
160
161 conversation = create_or_bump_conversation(activity, activity.actor)
162 participations = get_participations(conversation)
163 stream_out(activity)
164 stream_out_participations(participations)
165 end
166
167 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
168 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
169 {:ok, activity}
170 end
171 end
172
173 defp maybe_create_activity_expiration(result), do: result
174
175 defp create_or_bump_conversation(activity, actor) do
176 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
177 %User{} = user <- User.get_cached_by_ap_id(actor) do
178 Participation.mark_as_read(user, conversation)
179 {:ok, conversation}
180 end
181 end
182
183 defp get_participations({:ok, conversation}) do
184 conversation
185 |> Repo.preload(:participations, force: true)
186 |> Map.get(:participations)
187 end
188
189 defp get_participations(_), do: []
190
191 def stream_out_participations(participations) do
192 participations =
193 participations
194 |> Repo.preload(:user)
195
196 Streamer.stream("participation", participations)
197 end
198
199 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
200 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
201 conversation = Repo.preload(conversation, :participations)
202
203 last_activity_id =
204 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
205 user: user,
206 blocking_user: user
207 })
208
209 if last_activity_id do
210 stream_out_participations(conversation.participations)
211 end
212 end
213 end
214
215 def stream_out_participations(_, _), do: :noop
216
217 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
218 when data_type in ["Create", "Announce", "Delete"] do
219 activity
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(activity)
222 end
223
224 def stream_out(_activity) do
225 :noop
226 end
227
228 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
229 def create(params, fake \\ false) do
230 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
231 result
232 end
233 end
234
235 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
236 additional = params[:additional] || %{}
237 # only accept false as false value
238 local = !(params[:local] == false)
239 published = params[:published]
240 quick_insert? = Config.get([:env]) == :benchmark
241
242 create_data =
243 make_create_data(
244 %{to: to, actor: actor, published: published, context: context, object: object},
245 additional
246 )
247
248 with {:ok, activity} <- insert(create_data, local, fake),
249 {:fake, false, activity} <- {:fake, fake, activity},
250 _ <- increase_replies_count_if_reply(create_data),
251 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
252 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
253 _ <- notify_and_stream(activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:quick_insert, true, activity} ->
258 {:ok, activity}
259
260 {:fake, true, activity} ->
261 {:ok, activity}
262
263 {:error, message} ->
264 Repo.rollback(message)
265 end
266 end
267
268 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
269 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
270 additional = params[:additional] || %{}
271 # only accept false as false value
272 local = !(params[:local] == false)
273 published = params[:published]
274
275 listen_data =
276 make_listen_data(
277 %{to: to, actor: actor, published: published, context: context, object: object},
278 additional
279 )
280
281 with {:ok, activity} <- insert(listen_data, local),
282 _ <- notify_and_stream(activity),
283 :ok <- maybe_federate(activity) do
284 {:ok, activity}
285 end
286 end
287
288 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
289 def accept(params) do
290 accept_or_reject("Accept", params)
291 end
292
293 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
294 def reject(params) do
295 accept_or_reject("Reject", params)
296 end
297
298 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
299 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
300 local = Map.get(params, :local, true)
301 activity_id = Map.get(params, :activity_id, nil)
302
303 data =
304 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
305 |> Maps.put_if_present("id", activity_id)
306
307 with {:ok, activity} <- insert(data, local),
308 _ <- notify_and_stream(activity),
309 :ok <- maybe_federate(activity) do
310 {:ok, activity}
311 end
312 end
313
314 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
315 {:ok, Activity.t()} | nil | {:error, any()}
316 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
317 with {:ok, result} <-
318 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
319 result
320 end
321 end
322
323 defp do_unfollow(follower, followed, activity_id, local) do
324 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
325 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
326 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
327 {:ok, activity} <- insert(unfollow_data, local),
328 _ <- notify_and_stream(activity),
329 :ok <- maybe_federate(activity) do
330 {:ok, activity}
331 else
332 nil -> nil
333 {:error, error} -> Repo.rollback(error)
334 end
335 end
336
337 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
338 def flag(
339 %{
340 actor: actor,
341 context: _context,
342 account: account,
343 statuses: statuses,
344 content: content
345 } = params
346 ) do
347 # only accept false as false value
348 local = !(params[:local] == false)
349 forward = !(params[:forward] == false)
350
351 additional = params[:additional] || %{}
352
353 additional =
354 if forward do
355 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
356 else
357 Map.merge(additional, %{"to" => [], "cc" => []})
358 end
359
360 with flag_data <- make_flag_data(params, additional),
361 {:ok, activity} <- insert(flag_data, local),
362 {:ok, stripped_activity} <- strip_report_status_data(activity),
363 _ <- notify_and_stream(activity),
364 :ok <- maybe_federate(stripped_activity) do
365 User.all_superusers()
366 |> Enum.filter(fn user -> not is_nil(user.email) end)
367 |> Enum.each(fn superuser ->
368 superuser
369 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
370 |> Pleroma.Emails.Mailer.deliver_async()
371 end)
372
373 {:ok, activity}
374 end
375 end
376
377 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
378 def move(%User{} = origin, %User{} = target, local \\ true) do
379 params = %{
380 "type" => "Move",
381 "actor" => origin.ap_id,
382 "object" => origin.ap_id,
383 "target" => target.ap_id
384 }
385
386 with true <- origin.ap_id in target.also_known_as,
387 {:ok, activity} <- insert(params, local),
388 _ <- notify_and_stream(activity) do
389 maybe_federate(activity)
390
391 BackgroundWorker.enqueue("move_following", %{
392 "origin_id" => origin.id,
393 "target_id" => target.id
394 })
395
396 {:ok, activity}
397 else
398 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
399 err -> err
400 end
401 end
402
403 def fetch_activities_for_context_query(context, opts) do
404 public = [Constants.as_public()]
405
406 recipients =
407 if opts[:user],
408 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
409 else: public
410
411 from(activity in Activity)
412 |> maybe_preload_objects(opts)
413 |> maybe_preload_bookmarks(opts)
414 |> maybe_set_thread_muted_field(opts)
415 |> restrict_blocked(opts)
416 |> restrict_recipients(recipients, opts[:user])
417 |> restrict_filtered(opts)
418 |> where(
419 [activity],
420 fragment(
421 "?->>'type' = ? and ?->>'context' = ?",
422 activity.data,
423 "Create",
424 activity.data,
425 ^context
426 )
427 )
428 |> exclude_poll_votes(opts)
429 |> exclude_id(opts)
430 |> order_by([activity], desc: activity.id)
431 end
432
433 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
434 def fetch_activities_for_context(context, opts \\ %{}) do
435 context
436 |> fetch_activities_for_context_query(opts)
437 |> Repo.all()
438 end
439
440 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
441 FlakeId.Ecto.CompatType.t() | nil
442 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
443 context
444 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
445 |> restrict_visibility(%{visibility: "direct"})
446 |> limit(1)
447 |> select([a], a.id)
448 |> Repo.one()
449 end
450
451 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
452 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
453 opts = Map.delete(opts, :user)
454
455 [Constants.as_public()]
456 |> fetch_activities_query(opts)
457 |> restrict_unlisted(opts)
458 |> Pagination.fetch_paginated(opts, pagination)
459 end
460
461 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
462 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
463 opts
464 |> Map.put(:restrict_unlisted, true)
465 |> fetch_public_or_unlisted_activities(pagination)
466 end
467
468 @valid_visibilities ~w[direct unlisted public private]
469
470 defp restrict_visibility(query, %{visibility: visibility})
471 when is_list(visibility) do
472 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
473 from(
474 a in query,
475 where:
476 fragment(
477 "activity_visibility(?, ?, ?) = ANY (?)",
478 a.actor,
479 a.recipients,
480 a.data,
481 ^visibility
482 )
483 )
484 else
485 Logger.error("Could not restrict visibility to #{visibility}")
486 end
487 end
488
489 defp restrict_visibility(query, %{visibility: visibility})
490 when visibility in @valid_visibilities do
491 from(
492 a in query,
493 where:
494 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
495 )
496 end
497
498 defp restrict_visibility(_query, %{visibility: visibility})
499 when visibility not in @valid_visibilities do
500 Logger.error("Could not restrict visibility to #{visibility}")
501 end
502
503 defp restrict_visibility(query, _visibility), do: query
504
505 defp exclude_visibility(query, %{exclude_visibilities: visibility})
506 when is_list(visibility) do
507 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
508 from(
509 a in query,
510 where:
511 not fragment(
512 "activity_visibility(?, ?, ?) = ANY (?)",
513 a.actor,
514 a.recipients,
515 a.data,
516 ^visibility
517 )
518 )
519 else
520 Logger.error("Could not exclude visibility to #{visibility}")
521 query
522 end
523 end
524
525 defp exclude_visibility(query, %{exclude_visibilities: visibility})
526 when visibility in @valid_visibilities do
527 from(
528 a in query,
529 where:
530 not fragment(
531 "activity_visibility(?, ?, ?) = ?",
532 a.actor,
533 a.recipients,
534 a.data,
535 ^visibility
536 )
537 )
538 end
539
540 defp exclude_visibility(query, %{exclude_visibilities: visibility})
541 when visibility not in [nil | @valid_visibilities] do
542 Logger.error("Could not exclude visibility to #{visibility}")
543 query
544 end
545
546 defp exclude_visibility(query, _visibility), do: query
547
548 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
549 do: query
550
551 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
552 do: query
553
554 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
555 from(
556 a in query,
557 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
558 )
559 end
560
561 defp restrict_thread_visibility(query, _, _), do: query
562
563 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
564 params =
565 params
566 |> Map.put(:user, reading_user)
567 |> Map.put(:actor_id, user.ap_id)
568
569 %{
570 godmode: params[:godmode],
571 reading_user: reading_user
572 }
573 |> user_activities_recipients()
574 |> fetch_activities(params)
575 |> Enum.reverse()
576 end
577
578 def fetch_user_activities(user, reading_user, params \\ %{}) do
579 params =
580 params
581 |> Map.put(:type, ["Create", "Announce"])
582 |> Map.put(:user, reading_user)
583 |> Map.put(:actor_id, user.ap_id)
584 |> Map.put(:pinned_activity_ids, user.pinned_activities)
585
586 params =
587 if User.blocks?(reading_user, user) do
588 params
589 else
590 params
591 |> Map.put(:blocking_user, reading_user)
592 |> Map.put(:muting_user, reading_user)
593 end
594
595 %{
596 godmode: params[:godmode],
597 reading_user: reading_user
598 }
599 |> user_activities_recipients()
600 |> fetch_activities(params)
601 |> Enum.reverse()
602 end
603
604 def fetch_statuses(reading_user, params) do
605 params = Map.put(params, :type, ["Create", "Announce"])
606
607 %{
608 godmode: params[:godmode],
609 reading_user: reading_user
610 }
611 |> user_activities_recipients()
612 |> fetch_activities(params, :offset)
613 |> Enum.reverse()
614 end
615
616 defp user_activities_recipients(%{godmode: true}), do: []
617
618 defp user_activities_recipients(%{reading_user: reading_user}) do
619 if reading_user do
620 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
621 else
622 [Constants.as_public()]
623 end
624 end
625
626 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
627 raise "Can't use the child object without preloading!"
628 end
629
630 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
631 from(
632 [activity, object] in query,
633 where:
634 fragment(
635 "?->>'type' != ? or ?->>'actor' != ?",
636 activity.data,
637 "Announce",
638 object.data,
639 ^actor
640 )
641 )
642 end
643
644 defp restrict_announce_object_actor(query, _), do: query
645
646 defp restrict_since(query, %{since_id: ""}), do: query
647
648 defp restrict_since(query, %{since_id: since_id}) do
649 from(activity in query, where: activity.id > ^since_id)
650 end
651
652 defp restrict_since(query, _), do: query
653
654 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
655 raise "Can't use the child object without preloading!"
656 end
657
658 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
659 from(
660 [_activity, object] in query,
661 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
662 )
663 end
664
665 defp restrict_tag_reject(query, _), do: query
666
667 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
668 raise "Can't use the child object without preloading!"
669 end
670
671 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
672 from(
673 [_activity, object] in query,
674 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
675 )
676 end
677
678 defp restrict_tag_all(query, _), do: query
679
680 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
681 raise "Can't use the child object without preloading!"
682 end
683
684 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
685 from(
686 [_activity, object] in query,
687 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
688 )
689 end
690
691 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
692 from(
693 [_activity, object] in query,
694 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
695 )
696 end
697
698 defp restrict_tag(query, _), do: query
699
700 defp restrict_recipients(query, [], _user), do: query
701
702 defp restrict_recipients(query, recipients, nil) do
703 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
704 end
705
706 defp restrict_recipients(query, recipients, user) do
707 from(
708 activity in query,
709 where: fragment("? && ?", ^recipients, activity.recipients),
710 or_where: activity.actor == ^user.ap_id
711 )
712 end
713
714 defp restrict_local(query, %{local_only: true}) do
715 from(activity in query, where: activity.local == true)
716 end
717
718 defp restrict_local(query, _), do: query
719
720 defp restrict_actor(query, %{actor_id: actor_id}) do
721 from(activity in query, where: activity.actor == ^actor_id)
722 end
723
724 defp restrict_actor(query, _), do: query
725
726 defp restrict_type(query, %{type: type}) when is_binary(type) do
727 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
728 end
729
730 defp restrict_type(query, %{type: type}) do
731 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
732 end
733
734 defp restrict_type(query, _), do: query
735
736 defp restrict_state(query, %{state: state}) do
737 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
738 end
739
740 defp restrict_state(query, _), do: query
741
742 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
743 from(
744 [_activity, object] in query,
745 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
746 )
747 end
748
749 defp restrict_favorited_by(query, _), do: query
750
751 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
752 raise "Can't use the child object without preloading!"
753 end
754
755 defp restrict_media(query, %{only_media: true}) do
756 from(
757 [activity, object] in query,
758 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
759 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
760 )
761 end
762
763 defp restrict_media(query, _), do: query
764
765 defp restrict_replies(query, %{exclude_replies: true}) do
766 from(
767 [_activity, object] in query,
768 where: fragment("?->>'inReplyTo' is null", object.data)
769 )
770 end
771
772 defp restrict_replies(query, %{
773 reply_filtering_user: user,
774 reply_visibility: "self"
775 }) do
776 from(
777 [activity, object] in query,
778 where:
779 fragment(
780 "?->>'inReplyTo' is null OR ? = ANY(?)",
781 object.data,
782 ^user.ap_id,
783 activity.recipients
784 )
785 )
786 end
787
788 defp restrict_replies(query, %{
789 reply_filtering_user: user,
790 reply_visibility: "following"
791 }) do
792 from(
793 [activity, object] in query,
794 where:
795 fragment(
796 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
797 object.data,
798 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
799 activity.recipients,
800 activity.actor,
801 activity.actor,
802 ^user.ap_id
803 )
804 )
805 end
806
807 defp restrict_replies(query, _), do: query
808
809 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
810 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
811 end
812
813 defp restrict_reblogs(query, _), do: query
814
815 defp restrict_muted(query, %{with_muted: true}), do: query
816
817 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
818 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
819
820 query =
821 from([activity] in query,
822 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
823 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
824 )
825
826 unless opts[:skip_preload] do
827 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
828 else
829 query
830 end
831 end
832
833 defp restrict_muted(query, _), do: query
834
835 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
836 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
837 domain_blocks = user.domain_blocks || []
838
839 following_ap_ids = User.get_friends_ap_ids(user)
840
841 query =
842 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
843
844 from(
845 [activity, object: o] in query,
846 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
847 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
848 where:
849 fragment(
850 "recipients_contain_blocked_domains(?, ?) = false",
851 activity.recipients,
852 ^domain_blocks
853 ),
854 where:
855 fragment(
856 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
857 activity.data,
858 activity.data,
859 ^blocked_ap_ids
860 ),
861 where:
862 fragment(
863 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
864 activity.actor,
865 ^domain_blocks,
866 activity.actor,
867 ^following_ap_ids
868 ),
869 where:
870 fragment(
871 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
872 o.data,
873 ^domain_blocks,
874 o.data,
875 ^following_ap_ids
876 )
877 )
878 end
879
880 defp restrict_blocked(query, _), do: query
881
882 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
883 from(
884 activity in query,
885 where:
886 fragment(
887 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
888 activity.data,
889 ^[Constants.as_public()]
890 )
891 )
892 end
893
894 defp restrict_unlisted(query, _), do: query
895
896 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
897 from(activity in query, where: activity.id in ^ids)
898 end
899
900 defp restrict_pinned(query, _), do: query
901
902 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
903 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
904
905 from(
906 activity in query,
907 where:
908 fragment(
909 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
910 activity.data,
911 activity.actor,
912 ^muted_reblogs
913 )
914 )
915 end
916
917 defp restrict_muted_reblogs(query, _), do: query
918
919 defp restrict_instance(query, %{instance: instance}) do
920 users =
921 from(
922 u in User,
923 select: u.ap_id,
924 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
925 )
926 |> Repo.all()
927
928 from(activity in query, where: activity.actor in ^users)
929 end
930
931 defp restrict_instance(query, _), do: query
932
933 defp restrict_filtered(query, %{user: %User{} = user}) do
934 case Filter.compose_regex(user) do
935 nil ->
936 query
937
938 regex ->
939 from([activity, object] in query,
940 where:
941 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
942 activity.actor == ^user.ap_id
943 )
944 end
945 end
946
947 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
948 restrict_filtered(query, %{user: user})
949 end
950
951 defp restrict_filtered(query, _), do: query
952
953 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
954
955 defp exclude_poll_votes(query, _) do
956 if has_named_binding?(query, :object) do
957 from([activity, object: o] in query,
958 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
959 )
960 else
961 query
962 end
963 end
964
965 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
966
967 defp exclude_chat_messages(query, _) do
968 if has_named_binding?(query, :object) do
969 from([activity, object: o] in query,
970 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
971 )
972 else
973 query
974 end
975 end
976
977 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
978
979 defp exclude_invisible_actors(query, _opts) do
980 invisible_ap_ids =
981 User.Query.build(%{invisible: true, select: [:ap_id]})
982 |> Repo.all()
983 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
984
985 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
986 end
987
988 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
989 from(activity in query, where: activity.id != ^id)
990 end
991
992 defp exclude_id(query, _), do: query
993
994 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
995
996 defp maybe_preload_objects(query, _) do
997 query
998 |> Activity.with_preloaded_object()
999 end
1000
1001 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1002
1003 defp maybe_preload_bookmarks(query, opts) do
1004 query
1005 |> Activity.with_preloaded_bookmark(opts[:user])
1006 end
1007
1008 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1009 query
1010 |> Activity.with_preloaded_report_notes()
1011 end
1012
1013 defp maybe_preload_report_notes(query, _), do: query
1014
1015 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1016
1017 defp maybe_set_thread_muted_field(query, opts) do
1018 query
1019 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1020 end
1021
1022 defp maybe_order(query, %{order: :desc}) do
1023 query
1024 |> order_by(desc: :id)
1025 end
1026
1027 defp maybe_order(query, %{order: :asc}) do
1028 query
1029 |> order_by(asc: :id)
1030 end
1031
1032 defp maybe_order(query, _), do: query
1033
1034 defp fetch_activities_query_ap_ids_ops(opts) do
1035 source_user = opts[:muting_user]
1036 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1037
1038 ap_id_relationships =
1039 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1040 [:block | ap_id_relationships]
1041 else
1042 ap_id_relationships
1043 end
1044
1045 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1046
1047 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1048 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1049
1050 restrict_muted_reblogs_opts =
1051 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1052
1053 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1054 end
1055
1056 def fetch_activities_query(recipients, opts \\ %{}) do
1057 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1058 fetch_activities_query_ap_ids_ops(opts)
1059
1060 config = %{
1061 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1062 }
1063
1064 Activity
1065 |> maybe_preload_objects(opts)
1066 |> maybe_preload_bookmarks(opts)
1067 |> maybe_preload_report_notes(opts)
1068 |> maybe_set_thread_muted_field(opts)
1069 |> maybe_order(opts)
1070 |> restrict_recipients(recipients, opts[:user])
1071 |> restrict_replies(opts)
1072 |> restrict_tag(opts)
1073 |> restrict_tag_reject(opts)
1074 |> restrict_tag_all(opts)
1075 |> restrict_since(opts)
1076 |> restrict_local(opts)
1077 |> restrict_actor(opts)
1078 |> restrict_type(opts)
1079 |> restrict_state(opts)
1080 |> restrict_favorited_by(opts)
1081 |> restrict_blocked(restrict_blocked_opts)
1082 |> restrict_muted(restrict_muted_opts)
1083 |> restrict_filtered(opts)
1084 |> restrict_media(opts)
1085 |> restrict_visibility(opts)
1086 |> restrict_thread_visibility(opts, config)
1087 |> restrict_reblogs(opts)
1088 |> restrict_pinned(opts)
1089 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1090 |> restrict_instance(opts)
1091 |> restrict_announce_object_actor(opts)
1092 |> restrict_filtered(opts)
1093 |> Activity.restrict_deactivated_users()
1094 |> exclude_poll_votes(opts)
1095 |> exclude_chat_messages(opts)
1096 |> exclude_invisible_actors(opts)
1097 |> exclude_visibility(opts)
1098 end
1099
1100 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1101 list_memberships = Pleroma.List.memberships(opts[:user])
1102
1103 fetch_activities_query(recipients ++ list_memberships, opts)
1104 |> Pagination.fetch_paginated(opts, pagination)
1105 |> Enum.reverse()
1106 |> maybe_update_cc(list_memberships, opts[:user])
1107 end
1108
1109 @doc """
1110 Fetch favorites activities of user with order by sort adds to favorites
1111 """
1112 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1113 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1114 user.ap_id
1115 |> Activity.Queries.by_actor()
1116 |> Activity.Queries.by_type("Like")
1117 |> Activity.with_joined_object()
1118 |> Object.with_joined_activity()
1119 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1120 |> order_by([like, _, _], desc_nulls_last: like.id)
1121 |> Pagination.fetch_paginated(
1122 Map.merge(params, %{skip_order: true}),
1123 pagination
1124 )
1125 end
1126
1127 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1128 Enum.map(activities, fn
1129 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1130 if Enum.any?(bcc, &(&1 in list_memberships)) do
1131 update_in(activity.data["cc"], &[user_ap_id | &1])
1132 else
1133 activity
1134 end
1135
1136 activity ->
1137 activity
1138 end)
1139 end
1140
1141 defp maybe_update_cc(activities, _, _), do: activities
1142
1143 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1144 from(activity in query,
1145 where:
1146 fragment("? && ?", activity.recipients, ^recipients) or
1147 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1148 ^Constants.as_public() in activity.recipients)
1149 )
1150 end
1151
1152 def fetch_activities_bounded(
1153 recipients,
1154 recipients_with_public,
1155 opts \\ %{},
1156 pagination \\ :keyset
1157 ) do
1158 fetch_activities_query([], opts)
1159 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1160 |> Pagination.fetch_paginated(opts, pagination)
1161 |> Enum.reverse()
1162 end
1163
1164 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1165 def upload(file, opts \\ []) do
1166 with {:ok, data} <- Upload.store(file, opts) do
1167 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1168
1169 Repo.insert(%Object{data: obj_data})
1170 end
1171 end
1172
1173 @spec get_actor_url(any()) :: binary() | nil
1174 defp get_actor_url(url) when is_binary(url), do: url
1175 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1176
1177 defp get_actor_url(url) when is_list(url) do
1178 url
1179 |> List.first()
1180 |> get_actor_url()
1181 end
1182
1183 defp get_actor_url(_url), do: nil
1184
1185 defp object_to_user_data(data) do
1186 avatar =
1187 data["icon"]["url"] &&
1188 %{
1189 "type" => "Image",
1190 "url" => [%{"href" => data["icon"]["url"]}]
1191 }
1192
1193 banner =
1194 data["image"]["url"] &&
1195 %{
1196 "type" => "Image",
1197 "url" => [%{"href" => data["image"]["url"]}]
1198 }
1199
1200 fields =
1201 data
1202 |> Map.get("attachment", [])
1203 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1204 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1205
1206 emojis =
1207 data
1208 |> Map.get("tag", [])
1209 |> Enum.filter(fn
1210 %{"type" => "Emoji"} -> true
1211 _ -> false
1212 end)
1213 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1214 {String.trim(name, ":"), url}
1215 end)
1216
1217 locked = data["manuallyApprovesFollowers"] || false
1218 capabilities = data["capabilities"] || %{}
1219 accepts_chat_messages = capabilities["acceptsChatMessages"]
1220 data = Transmogrifier.maybe_fix_user_object(data)
1221 discoverable = data["discoverable"] || false
1222 invisible = data["invisible"] || false
1223 actor_type = data["type"] || "Person"
1224
1225 public_key =
1226 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1227 data["publicKey"]["publicKeyPem"]
1228 else
1229 nil
1230 end
1231
1232 shared_inbox =
1233 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1234 data["endpoints"]["sharedInbox"]
1235 else
1236 nil
1237 end
1238
1239 user_data = %{
1240 ap_id: data["id"],
1241 uri: get_actor_url(data["url"]),
1242 ap_enabled: true,
1243 banner: banner,
1244 fields: fields,
1245 emoji: emojis,
1246 locked: locked,
1247 discoverable: discoverable,
1248 invisible: invisible,
1249 avatar: avatar,
1250 name: data["name"],
1251 follower_address: data["followers"],
1252 following_address: data["following"],
1253 bio: data["summary"],
1254 actor_type: actor_type,
1255 also_known_as: Map.get(data, "alsoKnownAs", []),
1256 public_key: public_key,
1257 inbox: data["inbox"],
1258 shared_inbox: shared_inbox,
1259 accepts_chat_messages: accepts_chat_messages
1260 }
1261
1262 # nickname can be nil because of virtual actors
1263 if data["preferredUsername"] do
1264 Map.put(
1265 user_data,
1266 :nickname,
1267 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1268 )
1269 else
1270 Map.put(user_data, :nickname, nil)
1271 end
1272 end
1273
1274 def fetch_follow_information_for_user(user) do
1275 with {:ok, following_data} <-
1276 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1277 {:ok, hide_follows} <- collection_private(following_data),
1278 {:ok, followers_data} <-
1279 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1280 {:ok, hide_followers} <- collection_private(followers_data) do
1281 {:ok,
1282 %{
1283 hide_follows: hide_follows,
1284 follower_count: normalize_counter(followers_data["totalItems"]),
1285 following_count: normalize_counter(following_data["totalItems"]),
1286 hide_followers: hide_followers
1287 }}
1288 else
1289 {:error, _} = e -> e
1290 e -> {:error, e}
1291 end
1292 end
1293
1294 defp normalize_counter(counter) when is_integer(counter), do: counter
1295 defp normalize_counter(_), do: 0
1296
1297 def maybe_update_follow_information(user_data) do
1298 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1299 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1300 {_, true} <-
1301 {:collections_available,
1302 !!(user_data[:following_address] && user_data[:follower_address])},
1303 {:ok, info} <-
1304 fetch_follow_information_for_user(user_data) do
1305 info = Map.merge(user_data[:info] || %{}, info)
1306
1307 user_data
1308 |> Map.put(:info, info)
1309 else
1310 {:user_type_check, false} ->
1311 user_data
1312
1313 {:collections_available, false} ->
1314 user_data
1315
1316 {:enabled, false} ->
1317 user_data
1318
1319 e ->
1320 Logger.error(
1321 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1322 )
1323
1324 user_data
1325 end
1326 end
1327
1328 defp collection_private(%{"first" => %{"type" => type}})
1329 when type in ["CollectionPage", "OrderedCollectionPage"],
1330 do: {:ok, false}
1331
1332 defp collection_private(%{"first" => first}) do
1333 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1334 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1335 {:ok, false}
1336 else
1337 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1338 {:error, _} = e -> e
1339 e -> {:error, e}
1340 end
1341 end
1342
1343 defp collection_private(_data), do: {:ok, true}
1344
1345 def user_data_from_user_object(data) do
1346 with {:ok, data} <- MRF.filter(data) do
1347 {:ok, object_to_user_data(data)}
1348 else
1349 e -> {:error, e}
1350 end
1351 end
1352
1353 def fetch_and_prepare_user_from_ap_id(ap_id) do
1354 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1355 {:ok, data} <- user_data_from_user_object(data) do
1356 {:ok, maybe_update_follow_information(data)}
1357 else
1358 {:error, "Object has been deleted" = e} ->
1359 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1360 {:error, e}
1361
1362 {:error, {:reject, reason} = e} ->
1363 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1364 {:error, e}
1365
1366 {:error, e} ->
1367 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1368 {:error, e}
1369 end
1370 end
1371
1372 def maybe_handle_clashing_nickname(data) do
1373 nickname = data[:nickname]
1374
1375 with %User{} = old_user <- User.get_by_nickname(nickname),
1376 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1377 Logger.info(
1378 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1379 data[:ap_id]
1380 }, renaming."
1381 )
1382
1383 old_user
1384 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1385 |> User.update_and_set_cache()
1386 else
1387 {:ap_id_comparison, true} ->
1388 Logger.info(
1389 "Found an old user for #{nickname}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1390 )
1391
1392 _ ->
1393 nil
1394 end
1395 end
1396
1397 def make_user_from_ap_id(ap_id) do
1398 user = User.get_cached_by_ap_id(ap_id)
1399
1400 if user && !User.ap_enabled?(user) do
1401 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1402 else
1403 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1404 if user do
1405 user
1406 |> User.remote_user_changeset(data)
1407 |> User.update_and_set_cache()
1408 else
1409 maybe_handle_clashing_nickname(data)
1410
1411 data
1412 |> User.remote_user_changeset()
1413 |> Repo.insert()
1414 |> User.set_cache()
1415 end
1416 end
1417 end
1418 end
1419
1420 def make_user_from_nickname(nickname) do
1421 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1422 make_user_from_ap_id(ap_id)
1423 else
1424 _e -> {:error, "No AP id in WebFinger"}
1425 end
1426 end
1427
1428 # filter out broken threads
1429 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1430 entire_thread_visible_for_user?(activity, user)
1431 end
1432
1433 # do post-processing on a specific activity
1434 def contain_activity(%Activity{} = activity, %User{} = user) do
1435 contain_broken_threads(activity, user)
1436 end
1437
1438 def fetch_direct_messages_query do
1439 Activity
1440 |> restrict_type(%{type: "Create"})
1441 |> restrict_visibility(%{visibility: "direct"})
1442 |> order_by([activity], asc: activity.id)
1443 end
1444 end