Restrict statuses that contain user's irreversible filters
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.ActivityExpiration
9 alias Pleroma.Config
10 alias Pleroma.Constants
11 alias Pleroma.Conversation
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Filter
14 alias Pleroma.Maps
15 alias Pleroma.Notification
16 alias Pleroma.Object
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
20 alias Pleroma.Repo
21 alias Pleroma.Upload
22 alias Pleroma.User
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28
29 import Ecto.Query
30 import Pleroma.Web.ActivityPub.Utils
31 import Pleroma.Web.ActivityPub.Visibility
32
33 require Logger
34 require Pleroma.Constants
35
36 defp get_recipients(%{"type" => "Create"} = data) do
37 to = Map.get(data, "to", [])
38 cc = Map.get(data, "cc", [])
39 bcc = Map.get(data, "bcc", [])
40 actor = Map.get(data, "actor", [])
41 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
42 {recipients, to, cc}
43 end
44
45 defp get_recipients(data) do
46 to = Map.get(data, "to", [])
47 cc = Map.get(data, "cc", [])
48 bcc = Map.get(data, "bcc", [])
49 recipients = Enum.concat([to, cc, bcc])
50 {recipients, to, cc}
51 end
52
53 defp check_actor_is_active(nil), do: true
54
55 defp check_actor_is_active(actor) when is_binary(actor) do
56 case User.get_cached_by_ap_id(actor) do
57 %User{deactivated: deactivated} -> not deactivated
58 _ -> false
59 end
60 end
61
62 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
63 limit = Config.get([:instance, :remote_limit])
64 String.length(content) <= limit
65 end
66
67 defp check_remote_limit(_), do: true
68
69 defp increase_note_count_if_public(actor, object) do
70 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
71 end
72
73 def decrease_note_count_if_public(actor, object) do
74 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
75 end
76
77 defp increase_replies_count_if_reply(%{
78 "object" => %{"inReplyTo" => reply_ap_id} = object,
79 "type" => "Create"
80 }) do
81 if is_public?(object) do
82 Object.increase_replies_count(reply_ap_id)
83 end
84 end
85
86 defp increase_replies_count_if_reply(_create_data), do: :noop
87
88 defp increase_poll_votes_if_vote(%{
89 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
90 "type" => "Create",
91 "actor" => actor
92 }) do
93 Object.increase_vote_count(reply_ap_id, name, actor)
94 end
95
96 defp increase_poll_votes_if_vote(_create_data), do: :noop
97
98 @object_types ["ChatMessage"]
99 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
100 def persist(%{"type" => type} = object, meta) when type in @object_types do
101 with {:ok, object} <- Object.create(object) do
102 {:ok, object, meta}
103 end
104 end
105
106 def persist(object, meta) do
107 with local <- Keyword.fetch!(meta, :local),
108 {recipients, _, _} <- get_recipients(object),
109 {:ok, activity} <-
110 Repo.insert(%Activity{
111 data: object,
112 local: local,
113 recipients: recipients,
114 actor: object["actor"]
115 }) do
116 {:ok, activity, meta}
117 end
118 end
119
120 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
121 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
122 with nil <- Activity.normalize(map),
123 map <- lazy_put_activity_defaults(map, fake),
124 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
125 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
126 {:ok, map} <- MRF.filter(map),
127 {recipients, _, _} = get_recipients(map),
128 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
129 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
130 {:ok, map, object} <- insert_full_object(map) do
131 {:ok, activity} =
132 %Activity{
133 data: map,
134 local: local,
135 actor: map["actor"],
136 recipients: recipients
137 }
138 |> Repo.insert()
139 |> maybe_create_activity_expiration()
140
141 # Splice in the child object if we have one.
142 activity = Maps.put_if_present(activity, :object, object)
143
144 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
145
146 {:ok, activity}
147 else
148 %Activity{} = activity ->
149 {:ok, activity}
150
151 {:fake, true, map, recipients} ->
152 activity = %Activity{
153 data: map,
154 local: local,
155 actor: map["actor"],
156 recipients: recipients,
157 id: "pleroma:fakeid"
158 }
159
160 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
161 {:ok, activity}
162
163 error ->
164 {:error, error}
165 end
166 end
167
168 def notify_and_stream(activity) do
169 Notification.create_notifications(activity)
170
171 conversation = create_or_bump_conversation(activity, activity.actor)
172 participations = get_participations(conversation)
173 stream_out(activity)
174 stream_out_participations(participations)
175 end
176
177 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
178 with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
179 {:ok, activity}
180 end
181 end
182
183 defp maybe_create_activity_expiration(result), do: result
184
185 defp create_or_bump_conversation(activity, actor) do
186 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
187 %User{} = user <- User.get_cached_by_ap_id(actor) do
188 Participation.mark_as_read(user, conversation)
189 {:ok, conversation}
190 end
191 end
192
193 defp get_participations({:ok, conversation}) do
194 conversation
195 |> Repo.preload(:participations, force: true)
196 |> Map.get(:participations)
197 end
198
199 defp get_participations(_), do: []
200
201 def stream_out_participations(participations) do
202 participations =
203 participations
204 |> Repo.preload(:user)
205
206 Streamer.stream("participation", participations)
207 end
208
209 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
210 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
211 conversation = Repo.preload(conversation, :participations)
212
213 last_activity_id =
214 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
215 user: user,
216 blocking_user: user
217 })
218
219 if last_activity_id do
220 stream_out_participations(conversation.participations)
221 end
222 end
223 end
224
225 def stream_out_participations(_, _), do: :noop
226
227 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
228 when data_type in ["Create", "Announce", "Delete"] do
229 activity
230 |> Topics.get_activity_topics()
231 |> Streamer.stream(activity)
232 end
233
234 def stream_out(_activity) do
235 :noop
236 end
237
238 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
239 def create(params, fake \\ false) do
240 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
241 result
242 end
243 end
244
245 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
246 additional = params[:additional] || %{}
247 # only accept false as false value
248 local = !(params[:local] == false)
249 published = params[:published]
250 quick_insert? = Config.get([:env]) == :benchmark
251
252 create_data =
253 make_create_data(
254 %{to: to, actor: actor, published: published, context: context, object: object},
255 additional
256 )
257
258 with {:ok, activity} <- insert(create_data, local, fake),
259 {:fake, false, activity} <- {:fake, fake, activity},
260 _ <- increase_replies_count_if_reply(create_data),
261 _ <- increase_poll_votes_if_vote(create_data),
262 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
263 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
264 _ <- notify_and_stream(activity),
265 :ok <- maybe_federate(activity) do
266 {:ok, activity}
267 else
268 {:quick_insert, true, activity} ->
269 {:ok, activity}
270
271 {:fake, true, activity} ->
272 {:ok, activity}
273
274 {:error, message} ->
275 Repo.rollback(message)
276 end
277 end
278
279 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
280 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
281 additional = params[:additional] || %{}
282 # only accept false as false value
283 local = !(params[:local] == false)
284 published = params[:published]
285
286 listen_data =
287 make_listen_data(
288 %{to: to, actor: actor, published: published, context: context, object: object},
289 additional
290 )
291
292 with {:ok, activity} <- insert(listen_data, local),
293 _ <- notify_and_stream(activity),
294 :ok <- maybe_federate(activity) do
295 {:ok, activity}
296 end
297 end
298
299 @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()}
300 def accept(params) do
301 accept_or_reject("Accept", params)
302 end
303
304 @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()}
305 def reject(params) do
306 accept_or_reject("Reject", params)
307 end
308
309 @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()}
310 defp accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
311 local = Map.get(params, :local, true)
312 activity_id = Map.get(params, :activity_id, nil)
313
314 data =
315 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
316 |> Maps.put_if_present("id", activity_id)
317
318 with {:ok, activity} <- insert(data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) ::
326 {:ok, Activity.t()} | {:error, any()}
327 def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do
328 with {:ok, result} <-
329 Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do
330 result
331 end
332 end
333
334 defp do_follow(follower, followed, activity_id, local, opts) do
335 skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false)
336 data = make_follow_data(follower, followed, activity_id)
337
338 with {:ok, activity} <- insert(data, local),
339 _ <- skip_notify_and_stream || notify_and_stream(activity),
340 :ok <- maybe_federate(activity) do
341 {:ok, activity}
342 else
343 {:error, error} -> Repo.rollback(error)
344 end
345 end
346
347 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
348 {:ok, Activity.t()} | nil | {:error, any()}
349 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
350 with {:ok, result} <-
351 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
352 result
353 end
354 end
355
356 defp do_unfollow(follower, followed, activity_id, local) do
357 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
358 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
359 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
360 {:ok, activity} <- insert(unfollow_data, local),
361 _ <- notify_and_stream(activity),
362 :ok <- maybe_federate(activity) do
363 {:ok, activity}
364 else
365 nil -> nil
366 {:error, error} -> Repo.rollback(error)
367 end
368 end
369
370 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
371 def flag(
372 %{
373 actor: actor,
374 context: _context,
375 account: account,
376 statuses: statuses,
377 content: content
378 } = params
379 ) do
380 # only accept false as false value
381 local = !(params[:local] == false)
382 forward = !(params[:forward] == false)
383
384 additional = params[:additional] || %{}
385
386 additional =
387 if forward do
388 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
389 else
390 Map.merge(additional, %{"to" => [], "cc" => []})
391 end
392
393 with flag_data <- make_flag_data(params, additional),
394 {:ok, activity} <- insert(flag_data, local),
395 {:ok, stripped_activity} <- strip_report_status_data(activity),
396 _ <- notify_and_stream(activity),
397 :ok <- maybe_federate(stripped_activity) do
398 User.all_superusers()
399 |> Enum.filter(fn user -> not is_nil(user.email) end)
400 |> Enum.each(fn superuser ->
401 superuser
402 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
403 |> Pleroma.Emails.Mailer.deliver_async()
404 end)
405
406 {:ok, activity}
407 end
408 end
409
410 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
411 def move(%User{} = origin, %User{} = target, local \\ true) do
412 params = %{
413 "type" => "Move",
414 "actor" => origin.ap_id,
415 "object" => origin.ap_id,
416 "target" => target.ap_id
417 }
418
419 with true <- origin.ap_id in target.also_known_as,
420 {:ok, activity} <- insert(params, local),
421 _ <- notify_and_stream(activity) do
422 maybe_federate(activity)
423
424 BackgroundWorker.enqueue("move_following", %{
425 "origin_id" => origin.id,
426 "target_id" => target.id
427 })
428
429 {:ok, activity}
430 else
431 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
432 err -> err
433 end
434 end
435
436 def fetch_activities_for_context_query(context, opts) do
437 public = [Constants.as_public()]
438
439 recipients =
440 if opts[:user],
441 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
442 else: public
443
444 from(activity in Activity)
445 |> maybe_preload_objects(opts)
446 |> maybe_preload_bookmarks(opts)
447 |> maybe_set_thread_muted_field(opts)
448 |> restrict_blocked(opts)
449 |> restrict_recipients(recipients, opts[:user])
450 |> where(
451 [activity],
452 fragment(
453 "?->>'type' = ? and ?->>'context' = ?",
454 activity.data,
455 "Create",
456 activity.data,
457 ^context
458 )
459 )
460 |> exclude_poll_votes(opts)
461 |> exclude_id(opts)
462 |> order_by([activity], desc: activity.id)
463 end
464
465 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
466 def fetch_activities_for_context(context, opts \\ %{}) do
467 context
468 |> fetch_activities_for_context_query(opts)
469 |> Repo.all()
470 end
471
472 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
473 FlakeId.Ecto.CompatType.t() | nil
474 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
475 context
476 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
477 |> restrict_visibility(%{visibility: "direct"})
478 |> limit(1)
479 |> select([a], a.id)
480 |> Repo.one()
481 end
482
483 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
484 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
485 opts = Map.delete(opts, :user)
486
487 [Constants.as_public()]
488 |> fetch_activities_query(opts)
489 |> restrict_unlisted(opts)
490 |> Pagination.fetch_paginated(opts, pagination)
491 end
492
493 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
494 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
495 opts
496 |> Map.put(:restrict_unlisted, true)
497 |> fetch_public_or_unlisted_activities(pagination)
498 end
499
500 @valid_visibilities ~w[direct unlisted public private]
501
502 defp restrict_visibility(query, %{visibility: visibility})
503 when is_list(visibility) do
504 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
505 from(
506 a in query,
507 where:
508 fragment(
509 "activity_visibility(?, ?, ?) = ANY (?)",
510 a.actor,
511 a.recipients,
512 a.data,
513 ^visibility
514 )
515 )
516 else
517 Logger.error("Could not restrict visibility to #{visibility}")
518 end
519 end
520
521 defp restrict_visibility(query, %{visibility: visibility})
522 when visibility in @valid_visibilities do
523 from(
524 a in query,
525 where:
526 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
527 )
528 end
529
530 defp restrict_visibility(_query, %{visibility: visibility})
531 when visibility not in @valid_visibilities do
532 Logger.error("Could not restrict visibility to #{visibility}")
533 end
534
535 defp restrict_visibility(query, _visibility), do: query
536
537 defp exclude_visibility(query, %{exclude_visibilities: visibility})
538 when is_list(visibility) do
539 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
540 from(
541 a in query,
542 where:
543 not fragment(
544 "activity_visibility(?, ?, ?) = ANY (?)",
545 a.actor,
546 a.recipients,
547 a.data,
548 ^visibility
549 )
550 )
551 else
552 Logger.error("Could not exclude visibility to #{visibility}")
553 query
554 end
555 end
556
557 defp exclude_visibility(query, %{exclude_visibilities: visibility})
558 when visibility in @valid_visibilities do
559 from(
560 a in query,
561 where:
562 not fragment(
563 "activity_visibility(?, ?, ?) = ?",
564 a.actor,
565 a.recipients,
566 a.data,
567 ^visibility
568 )
569 )
570 end
571
572 defp exclude_visibility(query, %{exclude_visibilities: visibility})
573 when visibility not in [nil | @valid_visibilities] do
574 Logger.error("Could not exclude visibility to #{visibility}")
575 query
576 end
577
578 defp exclude_visibility(query, _visibility), do: query
579
580 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
581 do: query
582
583 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
584 do: query
585
586 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
587 from(
588 a in query,
589 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
590 )
591 end
592
593 defp restrict_thread_visibility(query, _, _), do: query
594
595 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
596 params =
597 params
598 |> Map.put(:user, reading_user)
599 |> Map.put(:actor_id, user.ap_id)
600
601 %{
602 godmode: params[:godmode],
603 reading_user: reading_user
604 }
605 |> user_activities_recipients()
606 |> fetch_activities(params)
607 |> Enum.reverse()
608 end
609
610 def fetch_user_activities(user, reading_user, params \\ %{}) do
611 params =
612 params
613 |> Map.put(:type, ["Create", "Announce"])
614 |> Map.put(:user, reading_user)
615 |> Map.put(:actor_id, user.ap_id)
616 |> Map.put(:pinned_activity_ids, user.pinned_activities)
617
618 params =
619 if User.blocks?(reading_user, user) do
620 params
621 else
622 params
623 |> Map.put(:blocking_user, reading_user)
624 |> Map.put(:muting_user, reading_user)
625 end
626
627 %{
628 godmode: params[:godmode],
629 reading_user: reading_user
630 }
631 |> user_activities_recipients()
632 |> fetch_activities(params)
633 |> Enum.reverse()
634 end
635
636 def fetch_statuses(reading_user, params) do
637 params = Map.put(params, :type, ["Create", "Announce"])
638
639 %{
640 godmode: params[:godmode],
641 reading_user: reading_user
642 }
643 |> user_activities_recipients()
644 |> fetch_activities(params, :offset)
645 |> Enum.reverse()
646 end
647
648 defp user_activities_recipients(%{godmode: true}), do: []
649
650 defp user_activities_recipients(%{reading_user: reading_user}) do
651 if reading_user do
652 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
653 else
654 [Constants.as_public()]
655 end
656 end
657
658 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
659 raise "Can't use the child object without preloading!"
660 end
661
662 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
663 from(
664 [activity, object] in query,
665 where:
666 fragment(
667 "?->>'type' != ? or ?->>'actor' != ?",
668 activity.data,
669 "Announce",
670 object.data,
671 ^actor
672 )
673 )
674 end
675
676 defp restrict_announce_object_actor(query, _), do: query
677
678 defp restrict_since(query, %{since_id: ""}), do: query
679
680 defp restrict_since(query, %{since_id: since_id}) do
681 from(activity in query, where: activity.id > ^since_id)
682 end
683
684 defp restrict_since(query, _), do: query
685
686 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
687 raise "Can't use the child object without preloading!"
688 end
689
690 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
691 from(
692 [_activity, object] in query,
693 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
694 )
695 end
696
697 defp restrict_tag_reject(query, _), do: query
698
699 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
700 raise "Can't use the child object without preloading!"
701 end
702
703 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
704 from(
705 [_activity, object] in query,
706 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
707 )
708 end
709
710 defp restrict_tag_all(query, _), do: query
711
712 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
713 raise "Can't use the child object without preloading!"
714 end
715
716 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
717 from(
718 [_activity, object] in query,
719 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
720 )
721 end
722
723 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
724 from(
725 [_activity, object] in query,
726 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
727 )
728 end
729
730 defp restrict_tag(query, _), do: query
731
732 defp restrict_recipients(query, [], _user), do: query
733
734 defp restrict_recipients(query, recipients, nil) do
735 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
736 end
737
738 defp restrict_recipients(query, recipients, user) do
739 from(
740 activity in query,
741 where: fragment("? && ?", ^recipients, activity.recipients),
742 or_where: activity.actor == ^user.ap_id
743 )
744 end
745
746 defp restrict_local(query, %{local_only: true}) do
747 from(activity in query, where: activity.local == true)
748 end
749
750 defp restrict_local(query, _), do: query
751
752 defp restrict_actor(query, %{actor_id: actor_id}) do
753 from(activity in query, where: activity.actor == ^actor_id)
754 end
755
756 defp restrict_actor(query, _), do: query
757
758 defp restrict_type(query, %{type: type}) when is_binary(type) do
759 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
760 end
761
762 defp restrict_type(query, %{type: type}) do
763 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
764 end
765
766 defp restrict_type(query, _), do: query
767
768 defp restrict_state(query, %{state: state}) do
769 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
770 end
771
772 defp restrict_state(query, _), do: query
773
774 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
775 from(
776 [_activity, object] in query,
777 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
778 )
779 end
780
781 defp restrict_favorited_by(query, _), do: query
782
783 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
784 raise "Can't use the child object without preloading!"
785 end
786
787 defp restrict_media(query, %{only_media: true}) do
788 from(
789 [activity, object] in query,
790 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
791 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
792 )
793 end
794
795 defp restrict_media(query, _), do: query
796
797 defp restrict_replies(query, %{exclude_replies: true}) do
798 from(
799 [_activity, object] in query,
800 where: fragment("?->>'inReplyTo' is null", object.data)
801 )
802 end
803
804 defp restrict_replies(query, %{
805 reply_filtering_user: user,
806 reply_visibility: "self"
807 }) do
808 from(
809 [activity, object] in query,
810 where:
811 fragment(
812 "?->>'inReplyTo' is null OR ? = ANY(?)",
813 object.data,
814 ^user.ap_id,
815 activity.recipients
816 )
817 )
818 end
819
820 defp restrict_replies(query, %{
821 reply_filtering_user: user,
822 reply_visibility: "following"
823 }) do
824 from(
825 [activity, object] in query,
826 where:
827 fragment(
828 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
829 object.data,
830 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
831 activity.recipients,
832 activity.actor,
833 activity.actor,
834 ^user.ap_id
835 )
836 )
837 end
838
839 defp restrict_replies(query, _), do: query
840
841 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
842 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
843 end
844
845 defp restrict_reblogs(query, _), do: query
846
847 defp restrict_muted(query, %{with_muted: true}), do: query
848
849 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
850 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
851
852 query =
853 from([activity] in query,
854 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
855 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
856 )
857
858 unless opts[:skip_preload] do
859 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
860 else
861 query
862 end
863 end
864
865 defp restrict_muted(query, _), do: query
866
867 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
868 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
869 domain_blocks = user.domain_blocks || []
870
871 following_ap_ids = User.get_friends_ap_ids(user)
872
873 query =
874 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
875
876 from(
877 [activity, object: o] in query,
878 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
879 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
880 where:
881 fragment(
882 "recipients_contain_blocked_domains(?, ?) = false",
883 activity.recipients,
884 ^domain_blocks
885 ),
886 where:
887 fragment(
888 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
889 activity.data,
890 activity.data,
891 ^blocked_ap_ids
892 ),
893 where:
894 fragment(
895 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
896 activity.actor,
897 ^domain_blocks,
898 activity.actor,
899 ^following_ap_ids
900 ),
901 where:
902 fragment(
903 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
904 o.data,
905 ^domain_blocks,
906 o.data,
907 ^following_ap_ids
908 )
909 )
910 end
911
912 defp restrict_blocked(query, _), do: query
913
914 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
915 from(
916 activity in query,
917 where:
918 fragment(
919 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
920 activity.data,
921 ^[Constants.as_public()]
922 )
923 )
924 end
925
926 defp restrict_unlisted(query, _), do: query
927
928 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
929 from(activity in query, where: activity.id in ^ids)
930 end
931
932 defp restrict_pinned(query, _), do: query
933
934 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
935 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
936
937 from(
938 activity in query,
939 where:
940 fragment(
941 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
942 activity.data,
943 activity.actor,
944 ^muted_reblogs
945 )
946 )
947 end
948
949 defp restrict_muted_reblogs(query, _), do: query
950
951 defp restrict_instance(query, %{instance: instance}) do
952 users =
953 from(
954 u in User,
955 select: u.ap_id,
956 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
957 )
958 |> Repo.all()
959
960 from(activity in query, where: activity.actor in ^users)
961 end
962
963 defp restrict_instance(query, _), do: query
964
965 defp restrict_filtered(query, %{user: %User{} = user}) do
966 case Filter.compose_regex(user) do
967 nil ->
968 query
969
970 regex ->
971 from([activity, object] in query,
972 where:
973 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
974 activity.actor == ^user.ap_id
975 )
976 end
977 end
978
979 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
980 restrict_filtered(query, %{user: user})
981 end
982
983 defp restrict_filtered(query, _), do: query
984
985 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
986
987 defp exclude_poll_votes(query, _) do
988 if has_named_binding?(query, :object) do
989 from([activity, object: o] in query,
990 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
991 )
992 else
993 query
994 end
995 end
996
997 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
998
999 defp exclude_chat_messages(query, _) do
1000 if has_named_binding?(query, :object) do
1001 from([activity, object: o] in query,
1002 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1003 )
1004 else
1005 query
1006 end
1007 end
1008
1009 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1010
1011 defp exclude_invisible_actors(query, _opts) do
1012 invisible_ap_ids =
1013 User.Query.build(%{invisible: true, select: [:ap_id]})
1014 |> Repo.all()
1015 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
1016
1017 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
1018 end
1019
1020 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1021 from(activity in query, where: activity.id != ^id)
1022 end
1023
1024 defp exclude_id(query, _), do: query
1025
1026 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1027
1028 defp maybe_preload_objects(query, _) do
1029 query
1030 |> Activity.with_preloaded_object()
1031 end
1032
1033 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1034
1035 defp maybe_preload_bookmarks(query, opts) do
1036 query
1037 |> Activity.with_preloaded_bookmark(opts[:user])
1038 end
1039
1040 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1041 query
1042 |> Activity.with_preloaded_report_notes()
1043 end
1044
1045 defp maybe_preload_report_notes(query, _), do: query
1046
1047 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1048
1049 defp maybe_set_thread_muted_field(query, opts) do
1050 query
1051 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1052 end
1053
1054 defp maybe_order(query, %{order: :desc}) do
1055 query
1056 |> order_by(desc: :id)
1057 end
1058
1059 defp maybe_order(query, %{order: :asc}) do
1060 query
1061 |> order_by(asc: :id)
1062 end
1063
1064 defp maybe_order(query, _), do: query
1065
1066 defp fetch_activities_query_ap_ids_ops(opts) do
1067 source_user = opts[:muting_user]
1068 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1069
1070 ap_id_relationships =
1071 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1072 [:block | ap_id_relationships]
1073 else
1074 ap_id_relationships
1075 end
1076
1077 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1078
1079 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1080 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1081
1082 restrict_muted_reblogs_opts =
1083 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1084
1085 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1086 end
1087
1088 def fetch_activities_query(recipients, opts \\ %{}) do
1089 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1090 fetch_activities_query_ap_ids_ops(opts)
1091
1092 config = %{
1093 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1094 }
1095
1096 Activity
1097 |> maybe_preload_objects(opts)
1098 |> maybe_preload_bookmarks(opts)
1099 |> maybe_preload_report_notes(opts)
1100 |> maybe_set_thread_muted_field(opts)
1101 |> maybe_order(opts)
1102 |> restrict_recipients(recipients, opts[:user])
1103 |> restrict_replies(opts)
1104 |> restrict_tag(opts)
1105 |> restrict_tag_reject(opts)
1106 |> restrict_tag_all(opts)
1107 |> restrict_since(opts)
1108 |> restrict_local(opts)
1109 |> restrict_actor(opts)
1110 |> restrict_type(opts)
1111 |> restrict_state(opts)
1112 |> restrict_favorited_by(opts)
1113 |> restrict_blocked(restrict_blocked_opts)
1114 |> restrict_muted(restrict_muted_opts)
1115 |> restrict_media(opts)
1116 |> restrict_visibility(opts)
1117 |> restrict_thread_visibility(opts, config)
1118 |> restrict_reblogs(opts)
1119 |> restrict_pinned(opts)
1120 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1121 |> restrict_instance(opts)
1122 |> restrict_announce_object_actor(opts)
1123 |> restrict_filtered(opts)
1124 |> Activity.restrict_deactivated_users()
1125 |> exclude_poll_votes(opts)
1126 |> exclude_chat_messages(opts)
1127 |> exclude_invisible_actors(opts)
1128 |> exclude_visibility(opts)
1129 end
1130
1131 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1132 list_memberships = Pleroma.List.memberships(opts[:user])
1133
1134 fetch_activities_query(recipients ++ list_memberships, opts)
1135 |> Pagination.fetch_paginated(opts, pagination)
1136 |> Enum.reverse()
1137 |> maybe_update_cc(list_memberships, opts[:user])
1138 end
1139
1140 @doc """
1141 Fetch favorites activities of user with order by sort adds to favorites
1142 """
1143 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1144 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1145 user.ap_id
1146 |> Activity.Queries.by_actor()
1147 |> Activity.Queries.by_type("Like")
1148 |> Activity.with_joined_object()
1149 |> Object.with_joined_activity()
1150 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1151 |> order_by([like, _, _], desc_nulls_last: like.id)
1152 |> Pagination.fetch_paginated(
1153 Map.merge(params, %{skip_order: true}),
1154 pagination
1155 )
1156 end
1157
1158 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1159 Enum.map(activities, fn
1160 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1161 if Enum.any?(bcc, &(&1 in list_memberships)) do
1162 update_in(activity.data["cc"], &[user_ap_id | &1])
1163 else
1164 activity
1165 end
1166
1167 activity ->
1168 activity
1169 end)
1170 end
1171
1172 defp maybe_update_cc(activities, _, _), do: activities
1173
1174 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1175 from(activity in query,
1176 where:
1177 fragment("? && ?", activity.recipients, ^recipients) or
1178 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1179 ^Constants.as_public() in activity.recipients)
1180 )
1181 end
1182
1183 def fetch_activities_bounded(
1184 recipients,
1185 recipients_with_public,
1186 opts \\ %{},
1187 pagination \\ :keyset
1188 ) do
1189 fetch_activities_query([], opts)
1190 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1191 |> Pagination.fetch_paginated(opts, pagination)
1192 |> Enum.reverse()
1193 end
1194
1195 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1196 def upload(file, opts \\ []) do
1197 with {:ok, data} <- Upload.store(file, opts) do
1198 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1199
1200 Repo.insert(%Object{data: obj_data})
1201 end
1202 end
1203
1204 @spec get_actor_url(any()) :: binary() | nil
1205 defp get_actor_url(url) when is_binary(url), do: url
1206 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1207
1208 defp get_actor_url(url) when is_list(url) do
1209 url
1210 |> List.first()
1211 |> get_actor_url()
1212 end
1213
1214 defp get_actor_url(_url), do: nil
1215
1216 defp object_to_user_data(data) do
1217 avatar =
1218 data["icon"]["url"] &&
1219 %{
1220 "type" => "Image",
1221 "url" => [%{"href" => data["icon"]["url"]}]
1222 }
1223
1224 banner =
1225 data["image"]["url"] &&
1226 %{
1227 "type" => "Image",
1228 "url" => [%{"href" => data["image"]["url"]}]
1229 }
1230
1231 fields =
1232 data
1233 |> Map.get("attachment", [])
1234 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1235 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1236
1237 emojis =
1238 data
1239 |> Map.get("tag", [])
1240 |> Enum.filter(fn
1241 %{"type" => "Emoji"} -> true
1242 _ -> false
1243 end)
1244 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1245 {String.trim(name, ":"), url}
1246 end)
1247
1248 locked = data["manuallyApprovesFollowers"] || false
1249 data = Transmogrifier.maybe_fix_user_object(data)
1250 discoverable = data["discoverable"] || false
1251 invisible = data["invisible"] || false
1252 actor_type = data["type"] || "Person"
1253
1254 public_key =
1255 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1256 data["publicKey"]["publicKeyPem"]
1257 else
1258 nil
1259 end
1260
1261 shared_inbox =
1262 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1263 data["endpoints"]["sharedInbox"]
1264 else
1265 nil
1266 end
1267
1268 user_data = %{
1269 ap_id: data["id"],
1270 uri: get_actor_url(data["url"]),
1271 ap_enabled: true,
1272 banner: banner,
1273 fields: fields,
1274 emoji: emojis,
1275 locked: locked,
1276 discoverable: discoverable,
1277 invisible: invisible,
1278 avatar: avatar,
1279 name: data["name"],
1280 follower_address: data["followers"],
1281 following_address: data["following"],
1282 bio: data["summary"],
1283 actor_type: actor_type,
1284 also_known_as: Map.get(data, "alsoKnownAs", []),
1285 public_key: public_key,
1286 inbox: data["inbox"],
1287 shared_inbox: shared_inbox
1288 }
1289
1290 # nickname can be nil because of virtual actors
1291 if data["preferredUsername"] do
1292 Map.put(
1293 user_data,
1294 :nickname,
1295 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1296 )
1297 else
1298 Map.put(user_data, :nickname, nil)
1299 end
1300 end
1301
1302 def fetch_follow_information_for_user(user) do
1303 with {:ok, following_data} <-
1304 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1305 {:ok, hide_follows} <- collection_private(following_data),
1306 {:ok, followers_data} <-
1307 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1308 {:ok, hide_followers} <- collection_private(followers_data) do
1309 {:ok,
1310 %{
1311 hide_follows: hide_follows,
1312 follower_count: normalize_counter(followers_data["totalItems"]),
1313 following_count: normalize_counter(following_data["totalItems"]),
1314 hide_followers: hide_followers
1315 }}
1316 else
1317 {:error, _} = e -> e
1318 e -> {:error, e}
1319 end
1320 end
1321
1322 defp normalize_counter(counter) when is_integer(counter), do: counter
1323 defp normalize_counter(_), do: 0
1324
1325 def maybe_update_follow_information(user_data) do
1326 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1327 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1328 {_, true} <-
1329 {:collections_available,
1330 !!(user_data[:following_address] && user_data[:follower_address])},
1331 {:ok, info} <-
1332 fetch_follow_information_for_user(user_data) do
1333 info = Map.merge(user_data[:info] || %{}, info)
1334
1335 user_data
1336 |> Map.put(:info, info)
1337 else
1338 {:user_type_check, false} ->
1339 user_data
1340
1341 {:collections_available, false} ->
1342 user_data
1343
1344 {:enabled, false} ->
1345 user_data
1346
1347 e ->
1348 Logger.error(
1349 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1350 )
1351
1352 user_data
1353 end
1354 end
1355
1356 defp collection_private(%{"first" => %{"type" => type}})
1357 when type in ["CollectionPage", "OrderedCollectionPage"],
1358 do: {:ok, false}
1359
1360 defp collection_private(%{"first" => first}) do
1361 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1362 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1363 {:ok, false}
1364 else
1365 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1366 {:error, _} = e -> e
1367 e -> {:error, e}
1368 end
1369 end
1370
1371 defp collection_private(_data), do: {:ok, true}
1372
1373 def user_data_from_user_object(data) do
1374 with {:ok, data} <- MRF.filter(data) do
1375 {:ok, object_to_user_data(data)}
1376 else
1377 e -> {:error, e}
1378 end
1379 end
1380
1381 def fetch_and_prepare_user_from_ap_id(ap_id) do
1382 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1383 {:ok, data} <- user_data_from_user_object(data) do
1384 {:ok, maybe_update_follow_information(data)}
1385 else
1386 {:error, "Object has been deleted" = e} ->
1387 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1388 {:error, e}
1389
1390 {:error, e} ->
1391 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1392 {:error, e}
1393 end
1394 end
1395
1396 def maybe_handle_clashing_nickname(nickname) do
1397 with %User{} = old_user <- User.get_by_nickname(nickname) do
1398 Logger.info("Found an old user for #{nickname}, ap id is #{old_user.ap_id}, renaming.")
1399
1400 old_user
1401 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1402 |> User.update_and_set_cache()
1403 end
1404 end
1405
1406 def make_user_from_ap_id(ap_id) do
1407 user = User.get_cached_by_ap_id(ap_id)
1408
1409 if user && !User.ap_enabled?(user) do
1410 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1411 else
1412 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1413 if user do
1414 user
1415 |> User.remote_user_changeset(data)
1416 |> User.update_and_set_cache()
1417 else
1418 maybe_handle_clashing_nickname(data[:nickname])
1419
1420 data
1421 |> User.remote_user_changeset()
1422 |> Repo.insert()
1423 |> User.set_cache()
1424 end
1425 end
1426 end
1427 end
1428
1429 def make_user_from_nickname(nickname) do
1430 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1431 make_user_from_ap_id(ap_id)
1432 else
1433 _e -> {:error, "No AP id in WebFinger"}
1434 end
1435 end
1436
1437 # filter out broken threads
1438 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1439 entire_thread_visible_for_user?(activity, user)
1440 end
1441
1442 # do post-processing on a specific activity
1443 def contain_activity(%Activity{} = activity, %User{} = user) do
1444 contain_broken_threads(activity, user)
1445 end
1446
1447 def fetch_direct_messages_query do
1448 Activity
1449 |> restrict_type(%{type: "Create"})
1450 |> restrict_visibility(%{visibility: "direct"})
1451 |> order_by([activity], asc: activity.id)
1452 end
1453 end