Merge branch '1668-prometheus-access-restrictions' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
91 {:ok, object, meta}
92 end
93 end
94
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
98 {:ok, activity} <-
99 Repo.insert(%Activity{
100 data: object,
101 local: local,
102 recipients: recipients,
103 actor: object["actor"]
104 }),
105 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
106 {:ok, _} <- maybe_create_activity_expiration(activity) do
107 {:ok, activity, meta}
108 end
109 end
110
111 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
112 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
113 with nil <- Activity.normalize(map),
114 map <- lazy_put_activity_defaults(map, fake),
115 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
116 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
117 {:ok, map} <- MRF.filter(map),
118 {recipients, _, _} = get_recipients(map),
119 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
120 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
121 {:ok, map, object} <- insert_full_object(map),
122 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
123 # Splice in the child object if we have one.
124 activity = Maps.put_if_present(activity, :object, object)
125
126 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
127
128 {:ok, activity}
129 else
130 %Activity{} = activity ->
131 {:ok, activity}
132
133 {:actor_check, _} ->
134 {:error, false}
135
136 {:containment, _} = error ->
137 error
138
139 {:error, _} = error ->
140 error
141
142 {:fake, true, map, recipients} ->
143 activity = %Activity{
144 data: map,
145 local: local,
146 actor: map["actor"],
147 recipients: recipients,
148 id: "pleroma:fakeid"
149 }
150
151 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
152 {:ok, activity}
153
154 {:remote_limit_pass, _} ->
155 {:error, :remote_limit}
156
157 {:reject, _} = e ->
158 {:error, e}
159 end
160 end
161
162 defp insert_activity_with_expiration(data, local, recipients) do
163 struct = %Activity{
164 data: data,
165 local: local,
166 actor: data["actor"],
167 recipients: recipients
168 }
169
170 with {:ok, activity} <- Repo.insert(struct) do
171 maybe_create_activity_expiration(activity)
172 end
173 end
174
175 def notify_and_stream(activity) do
176 Notification.create_notifications(activity)
177
178 conversation = create_or_bump_conversation(activity, activity.actor)
179 participations = get_participations(conversation)
180 stream_out(activity)
181 stream_out_participations(participations)
182 end
183
184 defp maybe_create_activity_expiration(
185 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
186 ) do
187 with {:ok, _job} <-
188 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
189 activity_id: activity.id,
190 expires_at: expires_at
191 }) do
192 {:ok, activity}
193 end
194 end
195
196 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
197
198 defp create_or_bump_conversation(activity, actor) do
199 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
200 %User{} = user <- User.get_cached_by_ap_id(actor) do
201 Participation.mark_as_read(user, conversation)
202 {:ok, conversation}
203 end
204 end
205
206 defp get_participations({:ok, conversation}) do
207 conversation
208 |> Repo.preload(:participations, force: true)
209 |> Map.get(:participations)
210 end
211
212 defp get_participations(_), do: []
213
214 def stream_out_participations(participations) do
215 participations =
216 participations
217 |> Repo.preload(:user)
218
219 Streamer.stream("participation", participations)
220 end
221
222 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
223 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
224 conversation = Repo.preload(conversation, :participations)
225
226 last_activity_id =
227 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
228 user: user,
229 blocking_user: user
230 })
231
232 if last_activity_id do
233 stream_out_participations(conversation.participations)
234 end
235 end
236 end
237
238 def stream_out_participations(_, _), do: :noop
239
240 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
241 when data_type in ["Create", "Announce", "Delete"] do
242 activity
243 |> Topics.get_activity_topics()
244 |> Streamer.stream(activity)
245 end
246
247 def stream_out(_activity) do
248 :noop
249 end
250
251 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
252 def create(params, fake \\ false) do
253 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
254 result
255 end
256 end
257
258 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
259 additional = params[:additional] || %{}
260 # only accept false as false value
261 local = !(params[:local] == false)
262 published = params[:published]
263 quick_insert? = Config.get([:env]) == :benchmark
264
265 create_data =
266 make_create_data(
267 %{to: to, actor: actor, published: published, context: context, object: object},
268 additional
269 )
270
271 with {:ok, activity} <- insert(create_data, local, fake),
272 {:fake, false, activity} <- {:fake, fake, activity},
273 _ <- increase_replies_count_if_reply(create_data),
274 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
275 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
276 _ <- notify_and_stream(activity),
277 :ok <- maybe_federate(activity) do
278 {:ok, activity}
279 else
280 {:quick_insert, true, activity} ->
281 {:ok, activity}
282
283 {:fake, true, activity} ->
284 {:ok, activity}
285
286 {:error, message} ->
287 Repo.rollback(message)
288 end
289 end
290
291 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
292 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
293 additional = params[:additional] || %{}
294 # only accept false as false value
295 local = !(params[:local] == false)
296 published = params[:published]
297
298 listen_data =
299 make_listen_data(
300 %{to: to, actor: actor, published: published, context: context, object: object},
301 additional
302 )
303
304 with {:ok, activity} <- insert(listen_data, local),
305 _ <- notify_and_stream(activity),
306 :ok <- maybe_federate(activity) do
307 {:ok, activity}
308 end
309 end
310
311 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
312 {:ok, Activity.t()} | nil | {:error, any()}
313 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
314 with {:ok, result} <-
315 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
316 result
317 end
318 end
319
320 defp do_unfollow(follower, followed, activity_id, local) do
321 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
322 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
323 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
324 {:ok, activity} <- insert(unfollow_data, local),
325 _ <- notify_and_stream(activity),
326 :ok <- maybe_federate(activity) do
327 {:ok, activity}
328 else
329 nil -> nil
330 {:error, error} -> Repo.rollback(error)
331 end
332 end
333
334 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
335 def flag(
336 %{
337 actor: actor,
338 context: _context,
339 account: account,
340 statuses: statuses,
341 content: content
342 } = params
343 ) do
344 # only accept false as false value
345 local = !(params[:local] == false)
346 forward = !(params[:forward] == false)
347
348 additional = params[:additional] || %{}
349
350 additional =
351 if forward do
352 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
353 else
354 Map.merge(additional, %{"to" => [], "cc" => []})
355 end
356
357 with flag_data <- make_flag_data(params, additional),
358 {:ok, activity} <- insert(flag_data, local),
359 {:ok, stripped_activity} <- strip_report_status_data(activity),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(stripped_activity) do
362 User.all_superusers()
363 |> Enum.filter(fn user -> not is_nil(user.email) end)
364 |> Enum.each(fn superuser ->
365 superuser
366 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
367 |> Pleroma.Emails.Mailer.deliver_async()
368 end)
369
370 {:ok, activity}
371 end
372 end
373
374 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
375 def move(%User{} = origin, %User{} = target, local \\ true) do
376 params = %{
377 "type" => "Move",
378 "actor" => origin.ap_id,
379 "object" => origin.ap_id,
380 "target" => target.ap_id
381 }
382
383 with true <- origin.ap_id in target.also_known_as,
384 {:ok, activity} <- insert(params, local),
385 _ <- notify_and_stream(activity) do
386 maybe_federate(activity)
387
388 BackgroundWorker.enqueue("move_following", %{
389 "origin_id" => origin.id,
390 "target_id" => target.id
391 })
392
393 {:ok, activity}
394 else
395 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
396 err -> err
397 end
398 end
399
400 def fetch_activities_for_context_query(context, opts) do
401 public = [Constants.as_public()]
402
403 recipients =
404 if opts[:user],
405 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
406 else: public
407
408 from(activity in Activity)
409 |> maybe_preload_objects(opts)
410 |> maybe_preload_bookmarks(opts)
411 |> maybe_set_thread_muted_field(opts)
412 |> restrict_blocked(opts)
413 |> restrict_recipients(recipients, opts[:user])
414 |> restrict_filtered(opts)
415 |> where(
416 [activity],
417 fragment(
418 "?->>'type' = ? and ?->>'context' = ?",
419 activity.data,
420 "Create",
421 activity.data,
422 ^context
423 )
424 )
425 |> exclude_poll_votes(opts)
426 |> exclude_id(opts)
427 |> order_by([activity], desc: activity.id)
428 end
429
430 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
431 def fetch_activities_for_context(context, opts \\ %{}) do
432 context
433 |> fetch_activities_for_context_query(opts)
434 |> Repo.all()
435 end
436
437 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
438 FlakeId.Ecto.CompatType.t() | nil
439 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
440 context
441 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
442 |> restrict_visibility(%{visibility: "direct"})
443 |> limit(1)
444 |> select([a], a.id)
445 |> Repo.one()
446 end
447
448 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
449 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
450 opts = Map.delete(opts, :user)
451
452 [Constants.as_public()]
453 |> fetch_activities_query(opts)
454 |> restrict_unlisted(opts)
455 |> Pagination.fetch_paginated(opts, pagination)
456 end
457
458 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
459 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
460 opts
461 |> Map.put(:restrict_unlisted, true)
462 |> fetch_public_or_unlisted_activities(pagination)
463 end
464
465 @valid_visibilities ~w[direct unlisted public private]
466
467 defp restrict_visibility(query, %{visibility: visibility})
468 when is_list(visibility) do
469 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
470 from(
471 a in query,
472 where:
473 fragment(
474 "activity_visibility(?, ?, ?) = ANY (?)",
475 a.actor,
476 a.recipients,
477 a.data,
478 ^visibility
479 )
480 )
481 else
482 Logger.error("Could not restrict visibility to #{visibility}")
483 end
484 end
485
486 defp restrict_visibility(query, %{visibility: visibility})
487 when visibility in @valid_visibilities do
488 from(
489 a in query,
490 where:
491 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
492 )
493 end
494
495 defp restrict_visibility(_query, %{visibility: visibility})
496 when visibility not in @valid_visibilities do
497 Logger.error("Could not restrict visibility to #{visibility}")
498 end
499
500 defp restrict_visibility(query, _visibility), do: query
501
502 defp exclude_visibility(query, %{exclude_visibilities: visibility})
503 when is_list(visibility) do
504 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
505 from(
506 a in query,
507 where:
508 not fragment(
509 "activity_visibility(?, ?, ?) = ANY (?)",
510 a.actor,
511 a.recipients,
512 a.data,
513 ^visibility
514 )
515 )
516 else
517 Logger.error("Could not exclude visibility to #{visibility}")
518 query
519 end
520 end
521
522 defp exclude_visibility(query, %{exclude_visibilities: visibility})
523 when visibility in @valid_visibilities do
524 from(
525 a in query,
526 where:
527 not fragment(
528 "activity_visibility(?, ?, ?) = ?",
529 a.actor,
530 a.recipients,
531 a.data,
532 ^visibility
533 )
534 )
535 end
536
537 defp exclude_visibility(query, %{exclude_visibilities: visibility})
538 when visibility not in [nil | @valid_visibilities] do
539 Logger.error("Could not exclude visibility to #{visibility}")
540 query
541 end
542
543 defp exclude_visibility(query, _visibility), do: query
544
545 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
546 do: query
547
548 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
549 do: query
550
551 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
552 from(
553 a in query,
554 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
555 )
556 end
557
558 defp restrict_thread_visibility(query, _, _), do: query
559
560 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
561 params =
562 params
563 |> Map.put(:user, reading_user)
564 |> Map.put(:actor_id, user.ap_id)
565
566 %{
567 godmode: params[:godmode],
568 reading_user: reading_user
569 }
570 |> user_activities_recipients()
571 |> fetch_activities(params)
572 |> Enum.reverse()
573 end
574
575 def fetch_user_activities(user, reading_user, params \\ %{}) do
576 params =
577 params
578 |> Map.put(:type, ["Create", "Announce"])
579 |> Map.put(:user, reading_user)
580 |> Map.put(:actor_id, user.ap_id)
581 |> Map.put(:pinned_activity_ids, user.pinned_activities)
582
583 params =
584 if User.blocks?(reading_user, user) do
585 params
586 else
587 params
588 |> Map.put(:blocking_user, reading_user)
589 |> Map.put(:muting_user, reading_user)
590 end
591
592 %{
593 godmode: params[:godmode],
594 reading_user: reading_user
595 }
596 |> user_activities_recipients()
597 |> fetch_activities(params)
598 |> Enum.reverse()
599 end
600
601 def fetch_statuses(reading_user, params) do
602 params = Map.put(params, :type, ["Create", "Announce"])
603
604 %{
605 godmode: params[:godmode],
606 reading_user: reading_user
607 }
608 |> user_activities_recipients()
609 |> fetch_activities(params, :offset)
610 |> Enum.reverse()
611 end
612
613 defp user_activities_recipients(%{godmode: true}), do: []
614
615 defp user_activities_recipients(%{reading_user: reading_user}) do
616 if reading_user do
617 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
618 else
619 [Constants.as_public()]
620 end
621 end
622
623 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
624 raise "Can't use the child object without preloading!"
625 end
626
627 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
628 from(
629 [activity, object] in query,
630 where:
631 fragment(
632 "?->>'type' != ? or ?->>'actor' != ?",
633 activity.data,
634 "Announce",
635 object.data,
636 ^actor
637 )
638 )
639 end
640
641 defp restrict_announce_object_actor(query, _), do: query
642
643 defp restrict_since(query, %{since_id: ""}), do: query
644
645 defp restrict_since(query, %{since_id: since_id}) do
646 from(activity in query, where: activity.id > ^since_id)
647 end
648
649 defp restrict_since(query, _), do: query
650
651 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
652 raise "Can't use the child object without preloading!"
653 end
654
655 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
656 from(
657 [_activity, object] in query,
658 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
659 )
660 end
661
662 defp restrict_tag_reject(query, _), do: query
663
664 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
665 raise "Can't use the child object without preloading!"
666 end
667
668 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
669 from(
670 [_activity, object] in query,
671 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
672 )
673 end
674
675 defp restrict_tag_all(query, _), do: query
676
677 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
678 raise "Can't use the child object without preloading!"
679 end
680
681 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
682 from(
683 [_activity, object] in query,
684 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
685 )
686 end
687
688 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
689 from(
690 [_activity, object] in query,
691 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
692 )
693 end
694
695 defp restrict_tag(query, _), do: query
696
697 defp restrict_recipients(query, [], _user), do: query
698
699 defp restrict_recipients(query, recipients, nil) do
700 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
701 end
702
703 defp restrict_recipients(query, recipients, user) do
704 from(
705 activity in query,
706 where: fragment("? && ?", ^recipients, activity.recipients),
707 or_where: activity.actor == ^user.ap_id
708 )
709 end
710
711 defp restrict_local(query, %{local_only: true}) do
712 from(activity in query, where: activity.local == true)
713 end
714
715 defp restrict_local(query, _), do: query
716
717 defp restrict_actor(query, %{actor_id: actor_id}) do
718 from(activity in query, where: activity.actor == ^actor_id)
719 end
720
721 defp restrict_actor(query, _), do: query
722
723 defp restrict_type(query, %{type: type}) when is_binary(type) do
724 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
725 end
726
727 defp restrict_type(query, %{type: type}) do
728 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
729 end
730
731 defp restrict_type(query, _), do: query
732
733 defp restrict_state(query, %{state: state}) do
734 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
735 end
736
737 defp restrict_state(query, _), do: query
738
739 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
740 from(
741 [_activity, object] in query,
742 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
743 )
744 end
745
746 defp restrict_favorited_by(query, _), do: query
747
748 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
749 raise "Can't use the child object without preloading!"
750 end
751
752 defp restrict_media(query, %{only_media: true}) do
753 from(
754 [activity, object] in query,
755 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
756 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
757 )
758 end
759
760 defp restrict_media(query, _), do: query
761
762 defp restrict_replies(query, %{exclude_replies: true}) do
763 from(
764 [_activity, object] in query,
765 where: fragment("?->>'inReplyTo' is null", object.data)
766 )
767 end
768
769 defp restrict_replies(query, %{
770 reply_filtering_user: %User{} = user,
771 reply_visibility: "self"
772 }) do
773 from(
774 [activity, object] in query,
775 where:
776 fragment(
777 "?->>'inReplyTo' is null OR ? = ANY(?)",
778 object.data,
779 ^user.ap_id,
780 activity.recipients
781 )
782 )
783 end
784
785 defp restrict_replies(query, %{
786 reply_filtering_user: %User{} = user,
787 reply_visibility: "following"
788 }) do
789 from(
790 [activity, object] in query,
791 where:
792 fragment(
793 """
794 ?->>'type' != 'Create' -- This isn't a Create
795 OR ?->>'inReplyTo' is null -- this isn't a reply
796 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
797 -- unless they are the author (because authors
798 -- are also part of the recipients). This leads
799 -- to a bug that self-replies by friends won't
800 -- show up.
801 OR ? = ? -- The actor is us
802 """,
803 activity.data,
804 object.data,
805 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
806 activity.recipients,
807 activity.actor,
808 activity.actor,
809 ^user.ap_id
810 )
811 )
812 end
813
814 defp restrict_replies(query, _), do: query
815
816 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
817 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
818 end
819
820 defp restrict_reblogs(query, _), do: query
821
822 defp restrict_muted(query, %{with_muted: true}), do: query
823
824 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
825 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
826
827 query =
828 from([activity] in query,
829 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
830 where:
831 fragment(
832 "not (?->'to' \\?| ?) or ? = ?",
833 activity.data,
834 ^mutes,
835 activity.actor,
836 ^user.ap_id
837 )
838 )
839
840 unless opts[:skip_preload] do
841 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
842 else
843 query
844 end
845 end
846
847 defp restrict_muted(query, _), do: query
848
849 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
850 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
851 domain_blocks = user.domain_blocks || []
852
853 following_ap_ids = User.get_friends_ap_ids(user)
854
855 query =
856 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
857
858 from(
859 [activity, object: o] in query,
860 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
861 where:
862 fragment(
863 "((not (? && ?)) or ? = ?)",
864 activity.recipients,
865 ^blocked_ap_ids,
866 activity.actor,
867 ^user.ap_id
868 ),
869 where:
870 fragment(
871 "recipients_contain_blocked_domains(?, ?) = false",
872 activity.recipients,
873 ^domain_blocks
874 ),
875 where:
876 fragment(
877 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
878 activity.data,
879 activity.data,
880 ^blocked_ap_ids
881 ),
882 where:
883 fragment(
884 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
885 activity.actor,
886 ^domain_blocks,
887 activity.actor,
888 ^following_ap_ids
889 ),
890 where:
891 fragment(
892 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
893 o.data,
894 ^domain_blocks,
895 o.data,
896 ^following_ap_ids
897 )
898 )
899 end
900
901 defp restrict_blocked(query, _), do: query
902
903 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
904 from(
905 activity in query,
906 where:
907 fragment(
908 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
909 activity.data,
910 ^[Constants.as_public()]
911 )
912 )
913 end
914
915 defp restrict_unlisted(query, _), do: query
916
917 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
918 from(activity in query, where: activity.id in ^ids)
919 end
920
921 defp restrict_pinned(query, _), do: query
922
923 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
924 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
925
926 from(
927 activity in query,
928 where:
929 fragment(
930 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
931 activity.data,
932 activity.actor,
933 ^muted_reblogs
934 )
935 )
936 end
937
938 defp restrict_muted_reblogs(query, _), do: query
939
940 defp restrict_instance(query, %{instance: instance}) do
941 users =
942 from(
943 u in User,
944 select: u.ap_id,
945 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
946 )
947 |> Repo.all()
948
949 from(activity in query, where: activity.actor in ^users)
950 end
951
952 defp restrict_instance(query, _), do: query
953
954 defp restrict_filtered(query, %{user: %User{} = user}) do
955 case Filter.compose_regex(user) do
956 nil ->
957 query
958
959 regex ->
960 from([activity, object] in query,
961 where:
962 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
963 activity.actor == ^user.ap_id
964 )
965 end
966 end
967
968 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
969 restrict_filtered(query, %{user: user})
970 end
971
972 defp restrict_filtered(query, _), do: query
973
974 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
975
976 defp exclude_poll_votes(query, _) do
977 if has_named_binding?(query, :object) do
978 from([activity, object: o] in query,
979 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
980 )
981 else
982 query
983 end
984 end
985
986 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
987
988 defp exclude_chat_messages(query, _) do
989 if has_named_binding?(query, :object) do
990 from([activity, object: o] in query,
991 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
992 )
993 else
994 query
995 end
996 end
997
998 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
999
1000 defp exclude_invisible_actors(query, _opts) do
1001 invisible_ap_ids =
1002 User.Query.build(%{invisible: true, select: [:ap_id]})
1003 |> Repo.all()
1004 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1005
1006 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1007 end
1008
1009 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1010 from(activity in query, where: activity.id != ^id)
1011 end
1012
1013 defp exclude_id(query, _), do: query
1014
1015 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1016
1017 defp maybe_preload_objects(query, _) do
1018 query
1019 |> Activity.with_preloaded_object()
1020 end
1021
1022 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1023
1024 defp maybe_preload_bookmarks(query, opts) do
1025 query
1026 |> Activity.with_preloaded_bookmark(opts[:user])
1027 end
1028
1029 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1030 query
1031 |> Activity.with_preloaded_report_notes()
1032 end
1033
1034 defp maybe_preload_report_notes(query, _), do: query
1035
1036 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1037
1038 defp maybe_set_thread_muted_field(query, opts) do
1039 query
1040 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1041 end
1042
1043 defp maybe_order(query, %{order: :desc}) do
1044 query
1045 |> order_by(desc: :id)
1046 end
1047
1048 defp maybe_order(query, %{order: :asc}) do
1049 query
1050 |> order_by(asc: :id)
1051 end
1052
1053 defp maybe_order(query, _), do: query
1054
1055 defp fetch_activities_query_ap_ids_ops(opts) do
1056 source_user = opts[:muting_user]
1057 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1058
1059 ap_id_relationships =
1060 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1061 [:block | ap_id_relationships]
1062 else
1063 ap_id_relationships
1064 end
1065
1066 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1067
1068 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1069 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1070
1071 restrict_muted_reblogs_opts =
1072 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1073
1074 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1075 end
1076
1077 def fetch_activities_query(recipients, opts \\ %{}) do
1078 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1079 fetch_activities_query_ap_ids_ops(opts)
1080
1081 config = %{
1082 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1083 }
1084
1085 Activity
1086 |> maybe_preload_objects(opts)
1087 |> maybe_preload_bookmarks(opts)
1088 |> maybe_preload_report_notes(opts)
1089 |> maybe_set_thread_muted_field(opts)
1090 |> maybe_order(opts)
1091 |> restrict_recipients(recipients, opts[:user])
1092 |> restrict_replies(opts)
1093 |> restrict_tag(opts)
1094 |> restrict_tag_reject(opts)
1095 |> restrict_tag_all(opts)
1096 |> restrict_since(opts)
1097 |> restrict_local(opts)
1098 |> restrict_actor(opts)
1099 |> restrict_type(opts)
1100 |> restrict_state(opts)
1101 |> restrict_favorited_by(opts)
1102 |> restrict_blocked(restrict_blocked_opts)
1103 |> restrict_muted(restrict_muted_opts)
1104 |> restrict_filtered(opts)
1105 |> restrict_media(opts)
1106 |> restrict_visibility(opts)
1107 |> restrict_thread_visibility(opts, config)
1108 |> restrict_reblogs(opts)
1109 |> restrict_pinned(opts)
1110 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1111 |> restrict_instance(opts)
1112 |> restrict_announce_object_actor(opts)
1113 |> restrict_filtered(opts)
1114 |> Activity.restrict_deactivated_users()
1115 |> exclude_poll_votes(opts)
1116 |> exclude_chat_messages(opts)
1117 |> exclude_invisible_actors(opts)
1118 |> exclude_visibility(opts)
1119 end
1120
1121 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1122 list_memberships = Pleroma.List.memberships(opts[:user])
1123
1124 fetch_activities_query(recipients ++ list_memberships, opts)
1125 |> Pagination.fetch_paginated(opts, pagination)
1126 |> Enum.reverse()
1127 |> maybe_update_cc(list_memberships, opts[:user])
1128 end
1129
1130 @doc """
1131 Fetch favorites activities of user with order by sort adds to favorites
1132 """
1133 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1134 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1135 user.ap_id
1136 |> Activity.Queries.by_actor()
1137 |> Activity.Queries.by_type("Like")
1138 |> Activity.with_joined_object()
1139 |> Object.with_joined_activity()
1140 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1141 |> order_by([like, _, _], desc_nulls_last: like.id)
1142 |> Pagination.fetch_paginated(
1143 Map.merge(params, %{skip_order: true}),
1144 pagination
1145 )
1146 end
1147
1148 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1149 Enum.map(activities, fn
1150 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1151 if Enum.any?(bcc, &(&1 in list_memberships)) do
1152 update_in(activity.data["cc"], &[user_ap_id | &1])
1153 else
1154 activity
1155 end
1156
1157 activity ->
1158 activity
1159 end)
1160 end
1161
1162 defp maybe_update_cc(activities, _, _), do: activities
1163
1164 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1165 from(activity in query,
1166 where:
1167 fragment("? && ?", activity.recipients, ^recipients) or
1168 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1169 ^Constants.as_public() in activity.recipients)
1170 )
1171 end
1172
1173 def fetch_activities_bounded(
1174 recipients,
1175 recipients_with_public,
1176 opts \\ %{},
1177 pagination \\ :keyset
1178 ) do
1179 fetch_activities_query([], opts)
1180 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1181 |> Pagination.fetch_paginated(opts, pagination)
1182 |> Enum.reverse()
1183 end
1184
1185 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1186 def upload(file, opts \\ []) do
1187 with {:ok, data} <- Upload.store(file, opts) do
1188 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1189
1190 Repo.insert(%Object{data: obj_data})
1191 end
1192 end
1193
1194 @spec get_actor_url(any()) :: binary() | nil
1195 defp get_actor_url(url) when is_binary(url), do: url
1196 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1197
1198 defp get_actor_url(url) when is_list(url) do
1199 url
1200 |> List.first()
1201 |> get_actor_url()
1202 end
1203
1204 defp get_actor_url(_url), do: nil
1205
1206 defp object_to_user_data(data) do
1207 avatar =
1208 data["icon"]["url"] &&
1209 %{
1210 "type" => "Image",
1211 "url" => [%{"href" => data["icon"]["url"]}]
1212 }
1213
1214 banner =
1215 data["image"]["url"] &&
1216 %{
1217 "type" => "Image",
1218 "url" => [%{"href" => data["image"]["url"]}]
1219 }
1220
1221 fields =
1222 data
1223 |> Map.get("attachment", [])
1224 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1225 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1226
1227 emojis =
1228 data
1229 |> Map.get("tag", [])
1230 |> Enum.filter(fn
1231 %{"type" => "Emoji"} -> true
1232 _ -> false
1233 end)
1234 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1235 {String.trim(name, ":"), url}
1236 end)
1237
1238 is_locked = data["manuallyApprovesFollowers"] || false
1239 capabilities = data["capabilities"] || %{}
1240 accepts_chat_messages = capabilities["acceptsChatMessages"]
1241 data = Transmogrifier.maybe_fix_user_object(data)
1242 is_discoverable = data["discoverable"] || false
1243 invisible = data["invisible"] || false
1244 actor_type = data["type"] || "Person"
1245
1246 public_key =
1247 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1248 data["publicKey"]["publicKeyPem"]
1249 else
1250 nil
1251 end
1252
1253 shared_inbox =
1254 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1255 data["endpoints"]["sharedInbox"]
1256 else
1257 nil
1258 end
1259
1260 user_data = %{
1261 ap_id: data["id"],
1262 uri: get_actor_url(data["url"]),
1263 ap_enabled: true,
1264 banner: banner,
1265 fields: fields,
1266 emoji: emojis,
1267 is_locked: is_locked,
1268 is_discoverable: is_discoverable,
1269 invisible: invisible,
1270 avatar: avatar,
1271 name: data["name"],
1272 follower_address: data["followers"],
1273 following_address: data["following"],
1274 bio: data["summary"] || "",
1275 actor_type: actor_type,
1276 also_known_as: Map.get(data, "alsoKnownAs", []),
1277 public_key: public_key,
1278 inbox: data["inbox"],
1279 shared_inbox: shared_inbox,
1280 accepts_chat_messages: accepts_chat_messages
1281 }
1282
1283 # nickname can be nil because of virtual actors
1284 if data["preferredUsername"] do
1285 Map.put(
1286 user_data,
1287 :nickname,
1288 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1289 )
1290 else
1291 Map.put(user_data, :nickname, nil)
1292 end
1293 end
1294
1295 def fetch_follow_information_for_user(user) do
1296 with {:ok, following_data} <-
1297 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
1298 force_http: true
1299 ),
1300 {:ok, hide_follows} <- collection_private(following_data),
1301 {:ok, followers_data} <-
1302 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
1303 {:ok, hide_followers} <- collection_private(followers_data) do
1304 {:ok,
1305 %{
1306 hide_follows: hide_follows,
1307 follower_count: normalize_counter(followers_data["totalItems"]),
1308 following_count: normalize_counter(following_data["totalItems"]),
1309 hide_followers: hide_followers
1310 }}
1311 else
1312 {:error, _} = e -> e
1313 e -> {:error, e}
1314 end
1315 end
1316
1317 defp normalize_counter(counter) when is_integer(counter), do: counter
1318 defp normalize_counter(_), do: 0
1319
1320 def maybe_update_follow_information(user_data) do
1321 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1322 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1323 {_, true} <-
1324 {:collections_available,
1325 !!(user_data[:following_address] && user_data[:follower_address])},
1326 {:ok, info} <-
1327 fetch_follow_information_for_user(user_data) do
1328 info = Map.merge(user_data[:info] || %{}, info)
1329
1330 user_data
1331 |> Map.put(:info, info)
1332 else
1333 {:user_type_check, false} ->
1334 user_data
1335
1336 {:collections_available, false} ->
1337 user_data
1338
1339 {:enabled, false} ->
1340 user_data
1341
1342 e ->
1343 Logger.error(
1344 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1345 )
1346
1347 user_data
1348 end
1349 end
1350
1351 defp collection_private(%{"first" => %{"type" => type}})
1352 when type in ["CollectionPage", "OrderedCollectionPage"],
1353 do: {:ok, false}
1354
1355 defp collection_private(%{"first" => first}) do
1356 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1357 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1358 {:ok, false}
1359 else
1360 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1361 {:error, _} = e -> e
1362 e -> {:error, e}
1363 end
1364 end
1365
1366 defp collection_private(_data), do: {:ok, true}
1367
1368 def user_data_from_user_object(data) do
1369 with {:ok, data} <- MRF.filter(data) do
1370 {:ok, object_to_user_data(data)}
1371 else
1372 e -> {:error, e}
1373 end
1374 end
1375
1376 def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
1377 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
1378 {:ok, data} <- user_data_from_user_object(data) do
1379 {:ok, maybe_update_follow_information(data)}
1380 else
1381 {:error, "Object has been deleted" = e} ->
1382 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1383 {:error, e}
1384
1385 {:error, {:reject, reason} = e} ->
1386 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1387 {:error, e}
1388
1389 {:error, e} ->
1390 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1391 {:error, e}
1392 end
1393 end
1394
1395 def maybe_handle_clashing_nickname(data) do
1396 with nickname when is_binary(nickname) <- data[:nickname],
1397 %User{} = old_user <- User.get_by_nickname(nickname),
1398 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1399 Logger.info(
1400 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1401 data[:ap_id]
1402 }, renaming."
1403 )
1404
1405 old_user
1406 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1407 |> User.update_and_set_cache()
1408 else
1409 {:ap_id_comparison, true} ->
1410 Logger.info(
1411 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1412 )
1413
1414 _ ->
1415 nil
1416 end
1417 end
1418
1419 def make_user_from_ap_id(ap_id, opts \\ []) do
1420 user = User.get_cached_by_ap_id(ap_id)
1421
1422 if user && !User.ap_enabled?(user) do
1423 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1424 else
1425 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
1426 if user do
1427 user
1428 |> User.remote_user_changeset(data)
1429 |> User.update_and_set_cache()
1430 else
1431 maybe_handle_clashing_nickname(data)
1432
1433 data
1434 |> User.remote_user_changeset()
1435 |> Repo.insert()
1436 |> User.set_cache()
1437 end
1438 end
1439 end
1440 end
1441
1442 def make_user_from_nickname(nickname) do
1443 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1444 make_user_from_ap_id(ap_id)
1445 else
1446 _e -> {:error, "No AP id in WebFinger"}
1447 end
1448 end
1449
1450 # filter out broken threads
1451 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1452 entire_thread_visible_for_user?(activity, user)
1453 end
1454
1455 # do post-processing on a specific activity
1456 def contain_activity(%Activity{} = activity, %User{} = user) do
1457 contain_broken_threads(activity, user)
1458 end
1459
1460 def fetch_direct_messages_query do
1461 Activity
1462 |> restrict_type(%{type: "Create"})
1463 |> restrict_visibility(%{visibility: "direct"})
1464 |> order_by([activity], asc: activity.id)
1465 end
1466 end