reject activity creation
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Filter
13 alias Pleroma.Maps
14 alias Pleroma.Notification
15 alias Pleroma.Object
16 alias Pleroma.Object.Containment
17 alias Pleroma.Object.Fetcher
18 alias Pleroma.Pagination
19 alias Pleroma.Repo
20 alias Pleroma.Upload
21 alias Pleroma.User
22 alias Pleroma.Web.ActivityPub.MRF
23 alias Pleroma.Web.ActivityPub.Transmogrifier
24 alias Pleroma.Web.Streamer
25 alias Pleroma.Web.WebFinger
26 alias Pleroma.Workers.BackgroundWorker
27
28 import Ecto.Query
29 import Pleroma.Web.ActivityPub.Utils
30 import Pleroma.Web.ActivityPub.Visibility
31
32 require Logger
33 require Pleroma.Constants
34
35 defp get_recipients(%{"type" => "Create"} = data) do
36 to = Map.get(data, "to", [])
37 cc = Map.get(data, "cc", [])
38 bcc = Map.get(data, "bcc", [])
39 actor = Map.get(data, "actor", [])
40 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
41 {recipients, to, cc}
42 end
43
44 defp get_recipients(data) do
45 to = Map.get(data, "to", [])
46 cc = Map.get(data, "cc", [])
47 bcc = Map.get(data, "bcc", [])
48 recipients = Enum.concat([to, cc, bcc])
49 {recipients, to, cc}
50 end
51
52 defp check_actor_is_active(nil), do: true
53
54 defp check_actor_is_active(actor) when is_binary(actor) do
55 case User.get_cached_by_ap_id(actor) do
56 %User{deactivated: deactivated} -> not deactivated
57 _ -> false
58 end
59 end
60
61 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
62 limit = Config.get([:instance, :remote_limit])
63 String.length(content) <= limit
64 end
65
66 defp check_remote_limit(_), do: true
67
68 def increase_note_count_if_public(actor, object) do
69 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
70 end
71
72 def decrease_note_count_if_public(actor, object) do
73 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
74 end
75
76 defp increase_replies_count_if_reply(%{
77 "object" => %{"inReplyTo" => reply_ap_id} = object,
78 "type" => "Create"
79 }) do
80 if is_public?(object) do
81 Object.increase_replies_count(reply_ap_id)
82 end
83 end
84
85 defp increase_replies_count_if_reply(_create_data), do: :noop
86
87 @object_types ~w[ChatMessage Question Answer Audio Event]
88 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
89 def persist(%{"type" => type} = object, meta) when type in @object_types do
90 with {:ok, object} <- Object.create(object) do
91 {:ok, object, meta}
92 end
93 end
94
95 def persist(object, meta) do
96 with local <- Keyword.fetch!(meta, :local),
97 {recipients, _, _} <- get_recipients(object),
98 {:ok, activity} <-
99 Repo.insert(%Activity{
100 data: object,
101 local: local,
102 recipients: recipients,
103 actor: object["actor"]
104 }) do
105 {:ok, activity, meta}
106 end
107 end
108
109 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
110 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
111 with nil <- Activity.normalize(map),
112 map <- lazy_put_activity_defaults(map, fake),
113 {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
114 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
115 {:ok, map} <- MRF.filter(map),
116 {recipients, _, _} = get_recipients(map),
117 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
118 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
119 {:ok, map, object} <- insert_full_object(map),
120 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
121 # Splice in the child object if we have one.
122 activity = Maps.put_if_present(activity, :object, object)
123
124 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
125
126 {:ok, activity}
127 else
128 %Activity{} = activity ->
129 {:ok, activity}
130
131 {:actor_check, _} ->
132 {:error, false}
133
134 {:containment, _} = error ->
135 error
136
137 {:error, _} = error ->
138 error
139
140 {:fake, true, map, recipients} ->
141 activity = %Activity{
142 data: map,
143 local: local,
144 actor: map["actor"],
145 recipients: recipients,
146 id: "pleroma:fakeid"
147 }
148
149 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
150 {:ok, activity}
151
152 {:remote_limit_pass, _} ->
153 {:error, :remote_limit}
154
155 {:reject, reason} ->
156 {:error, reason}
157 end
158 end
159
160 defp insert_activity_with_expiration(data, local, recipients) do
161 %Activity{
162 data: data,
163 local: local,
164 actor: data["actor"],
165 recipients: recipients
166 }
167 |> Repo.insert()
168 |> maybe_create_activity_expiration()
169 end
170
171 def notify_and_stream(activity) do
172 Notification.create_notifications(activity)
173
174 conversation = create_or_bump_conversation(activity, activity.actor)
175 participations = get_participations(conversation)
176 stream_out(activity)
177 stream_out_participations(participations)
178 end
179
180 defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
181 with {:ok, _job} <-
182 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
183 activity_id: activity.id,
184 expires_at: expires_at
185 }) do
186 {:ok, activity}
187 end
188 end
189
190 defp maybe_create_activity_expiration(result), do: result
191
192 defp create_or_bump_conversation(activity, actor) do
193 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
194 %User{} = user <- User.get_cached_by_ap_id(actor) do
195 Participation.mark_as_read(user, conversation)
196 {:ok, conversation}
197 end
198 end
199
200 defp get_participations({:ok, conversation}) do
201 conversation
202 |> Repo.preload(:participations, force: true)
203 |> Map.get(:participations)
204 end
205
206 defp get_participations(_), do: []
207
208 def stream_out_participations(participations) do
209 participations =
210 participations
211 |> Repo.preload(:user)
212
213 Streamer.stream("participation", participations)
214 end
215
216 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
217 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
218 conversation = Repo.preload(conversation, :participations)
219
220 last_activity_id =
221 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
222 user: user,
223 blocking_user: user
224 })
225
226 if last_activity_id do
227 stream_out_participations(conversation.participations)
228 end
229 end
230 end
231
232 def stream_out_participations(_, _), do: :noop
233
234 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
235 when data_type in ["Create", "Announce", "Delete"] do
236 activity
237 |> Topics.get_activity_topics()
238 |> Streamer.stream(activity)
239 end
240
241 def stream_out(_activity) do
242 :noop
243 end
244
245 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
246 def create(params, fake \\ false) do
247 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
248 result
249 end
250 end
251
252 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
253 additional = params[:additional] || %{}
254 # only accept false as false value
255 local = !(params[:local] == false)
256 published = params[:published]
257 quick_insert? = Config.get([:env]) == :benchmark
258
259 create_data =
260 make_create_data(
261 %{to: to, actor: actor, published: published, context: context, object: object},
262 additional
263 )
264
265 with {:ok, activity} <- insert(create_data, local, fake),
266 {:fake, false, activity} <- {:fake, fake, activity},
267 _ <- increase_replies_count_if_reply(create_data),
268 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
269 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
270 _ <- notify_and_stream(activity),
271 :ok <- maybe_federate(activity) do
272 {:ok, activity}
273 else
274 {:quick_insert, true, activity} ->
275 {:ok, activity}
276
277 {:fake, true, activity} ->
278 {:ok, activity}
279
280 {:error, message} ->
281 Repo.rollback(message)
282 end
283 end
284
285 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
286 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
287 additional = params[:additional] || %{}
288 # only accept false as false value
289 local = !(params[:local] == false)
290 published = params[:published]
291
292 listen_data =
293 make_listen_data(
294 %{to: to, actor: actor, published: published, context: context, object: object},
295 additional
296 )
297
298 with {:ok, activity} <- insert(listen_data, local),
299 _ <- notify_and_stream(activity),
300 :ok <- maybe_federate(activity) do
301 {:ok, activity}
302 end
303 end
304
305 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
306 {:ok, Activity.t()} | nil | {:error, any()}
307 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
308 with {:ok, result} <-
309 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
310 result
311 end
312 end
313
314 defp do_unfollow(follower, followed, activity_id, local) do
315 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
316 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
317 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
318 {:ok, activity} <- insert(unfollow_data, local),
319 _ <- notify_and_stream(activity),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 else
323 nil -> nil
324 {:error, error} -> Repo.rollback(error)
325 end
326 end
327
328 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
329 def flag(
330 %{
331 actor: actor,
332 context: _context,
333 account: account,
334 statuses: statuses,
335 content: content
336 } = params
337 ) do
338 # only accept false as false value
339 local = !(params[:local] == false)
340 forward = !(params[:forward] == false)
341
342 additional = params[:additional] || %{}
343
344 additional =
345 if forward do
346 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
347 else
348 Map.merge(additional, %{"to" => [], "cc" => []})
349 end
350
351 with flag_data <- make_flag_data(params, additional),
352 {:ok, activity} <- insert(flag_data, local),
353 {:ok, stripped_activity} <- strip_report_status_data(activity),
354 _ <- notify_and_stream(activity),
355 :ok <- maybe_federate(stripped_activity) do
356 User.all_superusers()
357 |> Enum.filter(fn user -> not is_nil(user.email) end)
358 |> Enum.each(fn superuser ->
359 superuser
360 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
361 |> Pleroma.Emails.Mailer.deliver_async()
362 end)
363
364 {:ok, activity}
365 end
366 end
367
368 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
369 def move(%User{} = origin, %User{} = target, local \\ true) do
370 params = %{
371 "type" => "Move",
372 "actor" => origin.ap_id,
373 "object" => origin.ap_id,
374 "target" => target.ap_id
375 }
376
377 with true <- origin.ap_id in target.also_known_as,
378 {:ok, activity} <- insert(params, local),
379 _ <- notify_and_stream(activity) do
380 maybe_federate(activity)
381
382 BackgroundWorker.enqueue("move_following", %{
383 "origin_id" => origin.id,
384 "target_id" => target.id
385 })
386
387 {:ok, activity}
388 else
389 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
390 err -> err
391 end
392 end
393
394 def fetch_activities_for_context_query(context, opts) do
395 public = [Constants.as_public()]
396
397 recipients =
398 if opts[:user],
399 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
400 else: public
401
402 from(activity in Activity)
403 |> maybe_preload_objects(opts)
404 |> maybe_preload_bookmarks(opts)
405 |> maybe_set_thread_muted_field(opts)
406 |> restrict_blocked(opts)
407 |> restrict_recipients(recipients, opts[:user])
408 |> restrict_filtered(opts)
409 |> where(
410 [activity],
411 fragment(
412 "?->>'type' = ? and ?->>'context' = ?",
413 activity.data,
414 "Create",
415 activity.data,
416 ^context
417 )
418 )
419 |> exclude_poll_votes(opts)
420 |> exclude_id(opts)
421 |> order_by([activity], desc: activity.id)
422 end
423
424 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
425 def fetch_activities_for_context(context, opts \\ %{}) do
426 context
427 |> fetch_activities_for_context_query(opts)
428 |> Repo.all()
429 end
430
431 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
432 FlakeId.Ecto.CompatType.t() | nil
433 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
434 context
435 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
436 |> restrict_visibility(%{visibility: "direct"})
437 |> limit(1)
438 |> select([a], a.id)
439 |> Repo.one()
440 end
441
442 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
443 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
444 opts = Map.delete(opts, :user)
445
446 [Constants.as_public()]
447 |> fetch_activities_query(opts)
448 |> restrict_unlisted(opts)
449 |> Pagination.fetch_paginated(opts, pagination)
450 end
451
452 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
453 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
454 opts
455 |> Map.put(:restrict_unlisted, true)
456 |> fetch_public_or_unlisted_activities(pagination)
457 end
458
459 @valid_visibilities ~w[direct unlisted public private]
460
461 defp restrict_visibility(query, %{visibility: visibility})
462 when is_list(visibility) do
463 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
464 from(
465 a in query,
466 where:
467 fragment(
468 "activity_visibility(?, ?, ?) = ANY (?)",
469 a.actor,
470 a.recipients,
471 a.data,
472 ^visibility
473 )
474 )
475 else
476 Logger.error("Could not restrict visibility to #{visibility}")
477 end
478 end
479
480 defp restrict_visibility(query, %{visibility: visibility})
481 when visibility in @valid_visibilities do
482 from(
483 a in query,
484 where:
485 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
486 )
487 end
488
489 defp restrict_visibility(_query, %{visibility: visibility})
490 when visibility not in @valid_visibilities do
491 Logger.error("Could not restrict visibility to #{visibility}")
492 end
493
494 defp restrict_visibility(query, _visibility), do: query
495
496 defp exclude_visibility(query, %{exclude_visibilities: visibility})
497 when is_list(visibility) do
498 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
499 from(
500 a in query,
501 where:
502 not fragment(
503 "activity_visibility(?, ?, ?) = ANY (?)",
504 a.actor,
505 a.recipients,
506 a.data,
507 ^visibility
508 )
509 )
510 else
511 Logger.error("Could not exclude visibility to #{visibility}")
512 query
513 end
514 end
515
516 defp exclude_visibility(query, %{exclude_visibilities: visibility})
517 when visibility in @valid_visibilities do
518 from(
519 a in query,
520 where:
521 not fragment(
522 "activity_visibility(?, ?, ?) = ?",
523 a.actor,
524 a.recipients,
525 a.data,
526 ^visibility
527 )
528 )
529 end
530
531 defp exclude_visibility(query, %{exclude_visibilities: visibility})
532 when visibility not in [nil | @valid_visibilities] do
533 Logger.error("Could not exclude visibility to #{visibility}")
534 query
535 end
536
537 defp exclude_visibility(query, _visibility), do: query
538
539 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
540 do: query
541
542 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
543 do: query
544
545 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
546 from(
547 a in query,
548 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
549 )
550 end
551
552 defp restrict_thread_visibility(query, _, _), do: query
553
554 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
555 params =
556 params
557 |> Map.put(:user, reading_user)
558 |> Map.put(:actor_id, user.ap_id)
559
560 %{
561 godmode: params[:godmode],
562 reading_user: reading_user
563 }
564 |> user_activities_recipients()
565 |> fetch_activities(params)
566 |> Enum.reverse()
567 end
568
569 def fetch_user_activities(user, reading_user, params \\ %{}) do
570 params =
571 params
572 |> Map.put(:type, ["Create", "Announce"])
573 |> Map.put(:user, reading_user)
574 |> Map.put(:actor_id, user.ap_id)
575 |> Map.put(:pinned_activity_ids, user.pinned_activities)
576
577 params =
578 if User.blocks?(reading_user, user) do
579 params
580 else
581 params
582 |> Map.put(:blocking_user, reading_user)
583 |> Map.put(:muting_user, reading_user)
584 end
585
586 %{
587 godmode: params[:godmode],
588 reading_user: reading_user
589 }
590 |> user_activities_recipients()
591 |> fetch_activities(params)
592 |> Enum.reverse()
593 end
594
595 def fetch_statuses(reading_user, params) do
596 params = Map.put(params, :type, ["Create", "Announce"])
597
598 %{
599 godmode: params[:godmode],
600 reading_user: reading_user
601 }
602 |> user_activities_recipients()
603 |> fetch_activities(params, :offset)
604 |> Enum.reverse()
605 end
606
607 defp user_activities_recipients(%{godmode: true}), do: []
608
609 defp user_activities_recipients(%{reading_user: reading_user}) do
610 if reading_user do
611 [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
612 else
613 [Constants.as_public()]
614 end
615 end
616
617 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
618 raise "Can't use the child object without preloading!"
619 end
620
621 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
622 from(
623 [activity, object] in query,
624 where:
625 fragment(
626 "?->>'type' != ? or ?->>'actor' != ?",
627 activity.data,
628 "Announce",
629 object.data,
630 ^actor
631 )
632 )
633 end
634
635 defp restrict_announce_object_actor(query, _), do: query
636
637 defp restrict_since(query, %{since_id: ""}), do: query
638
639 defp restrict_since(query, %{since_id: since_id}) do
640 from(activity in query, where: activity.id > ^since_id)
641 end
642
643 defp restrict_since(query, _), do: query
644
645 defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
646 raise "Can't use the child object without preloading!"
647 end
648
649 defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
650 from(
651 [_activity, object] in query,
652 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
653 )
654 end
655
656 defp restrict_tag_reject(query, _), do: query
657
658 defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
659 raise "Can't use the child object without preloading!"
660 end
661
662 defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
663 from(
664 [_activity, object] in query,
665 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
666 )
667 end
668
669 defp restrict_tag_all(query, _), do: query
670
671 defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
672 raise "Can't use the child object without preloading!"
673 end
674
675 defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
676 from(
677 [_activity, object] in query,
678 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
679 )
680 end
681
682 defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
683 from(
684 [_activity, object] in query,
685 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
686 )
687 end
688
689 defp restrict_tag(query, _), do: query
690
691 defp restrict_recipients(query, [], _user), do: query
692
693 defp restrict_recipients(query, recipients, nil) do
694 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
695 end
696
697 defp restrict_recipients(query, recipients, user) do
698 from(
699 activity in query,
700 where: fragment("? && ?", ^recipients, activity.recipients),
701 or_where: activity.actor == ^user.ap_id
702 )
703 end
704
705 defp restrict_local(query, %{local_only: true}) do
706 from(activity in query, where: activity.local == true)
707 end
708
709 defp restrict_local(query, _), do: query
710
711 defp restrict_actor(query, %{actor_id: actor_id}) do
712 from(activity in query, where: activity.actor == ^actor_id)
713 end
714
715 defp restrict_actor(query, _), do: query
716
717 defp restrict_type(query, %{type: type}) when is_binary(type) do
718 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
719 end
720
721 defp restrict_type(query, %{type: type}) do
722 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
723 end
724
725 defp restrict_type(query, _), do: query
726
727 defp restrict_state(query, %{state: state}) do
728 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
729 end
730
731 defp restrict_state(query, _), do: query
732
733 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
734 from(
735 [_activity, object] in query,
736 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
737 )
738 end
739
740 defp restrict_favorited_by(query, _), do: query
741
742 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
743 raise "Can't use the child object without preloading!"
744 end
745
746 defp restrict_media(query, %{only_media: true}) do
747 from(
748 [activity, object] in query,
749 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
750 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
751 )
752 end
753
754 defp restrict_media(query, _), do: query
755
756 defp restrict_replies(query, %{exclude_replies: true}) do
757 from(
758 [_activity, object] in query,
759 where: fragment("?->>'inReplyTo' is null", object.data)
760 )
761 end
762
763 defp restrict_replies(query, %{
764 reply_filtering_user: user,
765 reply_visibility: "self"
766 }) do
767 from(
768 [activity, object] in query,
769 where:
770 fragment(
771 "?->>'inReplyTo' is null OR ? = ANY(?)",
772 object.data,
773 ^user.ap_id,
774 activity.recipients
775 )
776 )
777 end
778
779 defp restrict_replies(query, %{
780 reply_filtering_user: user,
781 reply_visibility: "following"
782 }) do
783 from(
784 [activity, object] in query,
785 where:
786 fragment(
787 "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?",
788 object.data,
789 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
790 activity.recipients,
791 activity.actor,
792 activity.actor,
793 ^user.ap_id
794 )
795 )
796 end
797
798 defp restrict_replies(query, _), do: query
799
800 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
801 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
802 end
803
804 defp restrict_reblogs(query, _), do: query
805
806 defp restrict_muted(query, %{with_muted: true}), do: query
807
808 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
809 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
810
811 query =
812 from([activity] in query,
813 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
814 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
815 )
816
817 unless opts[:skip_preload] do
818 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
819 else
820 query
821 end
822 end
823
824 defp restrict_muted(query, _), do: query
825
826 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
827 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
828 domain_blocks = user.domain_blocks || []
829
830 following_ap_ids = User.get_friends_ap_ids(user)
831
832 query =
833 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
834
835 from(
836 [activity, object: o] in query,
837 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
838 where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids),
839 where:
840 fragment(
841 "recipients_contain_blocked_domains(?, ?) = false",
842 activity.recipients,
843 ^domain_blocks
844 ),
845 where:
846 fragment(
847 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
848 activity.data,
849 activity.data,
850 ^blocked_ap_ids
851 ),
852 where:
853 fragment(
854 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
855 activity.actor,
856 ^domain_blocks,
857 activity.actor,
858 ^following_ap_ids
859 ),
860 where:
861 fragment(
862 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
863 o.data,
864 ^domain_blocks,
865 o.data,
866 ^following_ap_ids
867 )
868 )
869 end
870
871 defp restrict_blocked(query, _), do: query
872
873 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
874 from(
875 activity in query,
876 where:
877 fragment(
878 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
879 activity.data,
880 ^[Constants.as_public()]
881 )
882 )
883 end
884
885 defp restrict_unlisted(query, _), do: query
886
887 defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
888 from(activity in query, where: activity.id in ^ids)
889 end
890
891 defp restrict_pinned(query, _), do: query
892
893 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
894 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
895
896 from(
897 activity in query,
898 where:
899 fragment(
900 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
901 activity.data,
902 activity.actor,
903 ^muted_reblogs
904 )
905 )
906 end
907
908 defp restrict_muted_reblogs(query, _), do: query
909
910 defp restrict_instance(query, %{instance: instance}) do
911 users =
912 from(
913 u in User,
914 select: u.ap_id,
915 where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}")
916 )
917 |> Repo.all()
918
919 from(activity in query, where: activity.actor in ^users)
920 end
921
922 defp restrict_instance(query, _), do: query
923
924 defp restrict_filtered(query, %{user: %User{} = user}) do
925 case Filter.compose_regex(user) do
926 nil ->
927 query
928
929 regex ->
930 from([activity, object] in query,
931 where:
932 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
933 activity.actor == ^user.ap_id
934 )
935 end
936 end
937
938 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
939 restrict_filtered(query, %{user: user})
940 end
941
942 defp restrict_filtered(query, _), do: query
943
944 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
945
946 defp exclude_poll_votes(query, _) do
947 if has_named_binding?(query, :object) do
948 from([activity, object: o] in query,
949 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
950 )
951 else
952 query
953 end
954 end
955
956 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
957
958 defp exclude_chat_messages(query, _) do
959 if has_named_binding?(query, :object) do
960 from([activity, object: o] in query,
961 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
962 )
963 else
964 query
965 end
966 end
967
968 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
969
970 defp exclude_invisible_actors(query, _opts) do
971 invisible_ap_ids =
972 User.Query.build(%{invisible: true, select: [:ap_id]})
973 |> Repo.all()
974 |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
975
976 from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
977 end
978
979 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
980 from(activity in query, where: activity.id != ^id)
981 end
982
983 defp exclude_id(query, _), do: query
984
985 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
986
987 defp maybe_preload_objects(query, _) do
988 query
989 |> Activity.with_preloaded_object()
990 end
991
992 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
993
994 defp maybe_preload_bookmarks(query, opts) do
995 query
996 |> Activity.with_preloaded_bookmark(opts[:user])
997 end
998
999 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1000 query
1001 |> Activity.with_preloaded_report_notes()
1002 end
1003
1004 defp maybe_preload_report_notes(query, _), do: query
1005
1006 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1007
1008 defp maybe_set_thread_muted_field(query, opts) do
1009 query
1010 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1011 end
1012
1013 defp maybe_order(query, %{order: :desc}) do
1014 query
1015 |> order_by(desc: :id)
1016 end
1017
1018 defp maybe_order(query, %{order: :asc}) do
1019 query
1020 |> order_by(asc: :id)
1021 end
1022
1023 defp maybe_order(query, _), do: query
1024
1025 defp fetch_activities_query_ap_ids_ops(opts) do
1026 source_user = opts[:muting_user]
1027 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1028
1029 ap_id_relationships =
1030 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1031 [:block | ap_id_relationships]
1032 else
1033 ap_id_relationships
1034 end
1035
1036 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1037
1038 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1039 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1040
1041 restrict_muted_reblogs_opts =
1042 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1043
1044 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1045 end
1046
1047 def fetch_activities_query(recipients, opts \\ %{}) do
1048 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1049 fetch_activities_query_ap_ids_ops(opts)
1050
1051 config = %{
1052 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1053 }
1054
1055 Activity
1056 |> maybe_preload_objects(opts)
1057 |> maybe_preload_bookmarks(opts)
1058 |> maybe_preload_report_notes(opts)
1059 |> maybe_set_thread_muted_field(opts)
1060 |> maybe_order(opts)
1061 |> restrict_recipients(recipients, opts[:user])
1062 |> restrict_replies(opts)
1063 |> restrict_tag(opts)
1064 |> restrict_tag_reject(opts)
1065 |> restrict_tag_all(opts)
1066 |> restrict_since(opts)
1067 |> restrict_local(opts)
1068 |> restrict_actor(opts)
1069 |> restrict_type(opts)
1070 |> restrict_state(opts)
1071 |> restrict_favorited_by(opts)
1072 |> restrict_blocked(restrict_blocked_opts)
1073 |> restrict_muted(restrict_muted_opts)
1074 |> restrict_filtered(opts)
1075 |> restrict_media(opts)
1076 |> restrict_visibility(opts)
1077 |> restrict_thread_visibility(opts, config)
1078 |> restrict_reblogs(opts)
1079 |> restrict_pinned(opts)
1080 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1081 |> restrict_instance(opts)
1082 |> restrict_announce_object_actor(opts)
1083 |> restrict_filtered(opts)
1084 |> Activity.restrict_deactivated_users()
1085 |> exclude_poll_votes(opts)
1086 |> exclude_chat_messages(opts)
1087 |> exclude_invisible_actors(opts)
1088 |> exclude_visibility(opts)
1089 end
1090
1091 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
1092 list_memberships = Pleroma.List.memberships(opts[:user])
1093
1094 fetch_activities_query(recipients ++ list_memberships, opts)
1095 |> Pagination.fetch_paginated(opts, pagination)
1096 |> Enum.reverse()
1097 |> maybe_update_cc(list_memberships, opts[:user])
1098 end
1099
1100 @doc """
1101 Fetch favorites activities of user with order by sort adds to favorites
1102 """
1103 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1104 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1105 user.ap_id
1106 |> Activity.Queries.by_actor()
1107 |> Activity.Queries.by_type("Like")
1108 |> Activity.with_joined_object()
1109 |> Object.with_joined_activity()
1110 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1111 |> order_by([like, _, _], desc_nulls_last: like.id)
1112 |> Pagination.fetch_paginated(
1113 Map.merge(params, %{skip_order: true}),
1114 pagination
1115 )
1116 end
1117
1118 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1119 Enum.map(activities, fn
1120 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1121 if Enum.any?(bcc, &(&1 in list_memberships)) do
1122 update_in(activity.data["cc"], &[user_ap_id | &1])
1123 else
1124 activity
1125 end
1126
1127 activity ->
1128 activity
1129 end)
1130 end
1131
1132 defp maybe_update_cc(activities, _, _), do: activities
1133
1134 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1135 from(activity in query,
1136 where:
1137 fragment("? && ?", activity.recipients, ^recipients) or
1138 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1139 ^Constants.as_public() in activity.recipients)
1140 )
1141 end
1142
1143 def fetch_activities_bounded(
1144 recipients,
1145 recipients_with_public,
1146 opts \\ %{},
1147 pagination \\ :keyset
1148 ) do
1149 fetch_activities_query([], opts)
1150 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1151 |> Pagination.fetch_paginated(opts, pagination)
1152 |> Enum.reverse()
1153 end
1154
1155 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1156 def upload(file, opts \\ []) do
1157 with {:ok, data} <- Upload.store(file, opts) do
1158 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1159
1160 Repo.insert(%Object{data: obj_data})
1161 end
1162 end
1163
1164 @spec get_actor_url(any()) :: binary() | nil
1165 defp get_actor_url(url) when is_binary(url), do: url
1166 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1167
1168 defp get_actor_url(url) when is_list(url) do
1169 url
1170 |> List.first()
1171 |> get_actor_url()
1172 end
1173
1174 defp get_actor_url(_url), do: nil
1175
1176 defp object_to_user_data(data) do
1177 avatar =
1178 data["icon"]["url"] &&
1179 %{
1180 "type" => "Image",
1181 "url" => [%{"href" => data["icon"]["url"]}]
1182 }
1183
1184 banner =
1185 data["image"]["url"] &&
1186 %{
1187 "type" => "Image",
1188 "url" => [%{"href" => data["image"]["url"]}]
1189 }
1190
1191 fields =
1192 data
1193 |> Map.get("attachment", [])
1194 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1195 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1196
1197 emojis =
1198 data
1199 |> Map.get("tag", [])
1200 |> Enum.filter(fn
1201 %{"type" => "Emoji"} -> true
1202 _ -> false
1203 end)
1204 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1205 {String.trim(name, ":"), url}
1206 end)
1207
1208 locked = data["manuallyApprovesFollowers"] || false
1209 capabilities = data["capabilities"] || %{}
1210 accepts_chat_messages = capabilities["acceptsChatMessages"]
1211 data = Transmogrifier.maybe_fix_user_object(data)
1212 discoverable = data["discoverable"] || false
1213 invisible = data["invisible"] || false
1214 actor_type = data["type"] || "Person"
1215
1216 public_key =
1217 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1218 data["publicKey"]["publicKeyPem"]
1219 else
1220 nil
1221 end
1222
1223 shared_inbox =
1224 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1225 data["endpoints"]["sharedInbox"]
1226 else
1227 nil
1228 end
1229
1230 user_data = %{
1231 ap_id: data["id"],
1232 uri: get_actor_url(data["url"]),
1233 ap_enabled: true,
1234 banner: banner,
1235 fields: fields,
1236 emoji: emojis,
1237 locked: locked,
1238 discoverable: discoverable,
1239 invisible: invisible,
1240 avatar: avatar,
1241 name: data["name"],
1242 follower_address: data["followers"],
1243 following_address: data["following"],
1244 bio: data["summary"] || "",
1245 actor_type: actor_type,
1246 also_known_as: Map.get(data, "alsoKnownAs", []),
1247 public_key: public_key,
1248 inbox: data["inbox"],
1249 shared_inbox: shared_inbox,
1250 accepts_chat_messages: accepts_chat_messages
1251 }
1252
1253 # nickname can be nil because of virtual actors
1254 if data["preferredUsername"] do
1255 Map.put(
1256 user_data,
1257 :nickname,
1258 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1259 )
1260 else
1261 Map.put(user_data, :nickname, nil)
1262 end
1263 end
1264
1265 def fetch_follow_information_for_user(user) do
1266 with {:ok, following_data} <-
1267 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1268 {:ok, hide_follows} <- collection_private(following_data),
1269 {:ok, followers_data} <-
1270 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1271 {:ok, hide_followers} <- collection_private(followers_data) do
1272 {:ok,
1273 %{
1274 hide_follows: hide_follows,
1275 follower_count: normalize_counter(followers_data["totalItems"]),
1276 following_count: normalize_counter(following_data["totalItems"]),
1277 hide_followers: hide_followers
1278 }}
1279 else
1280 {:error, _} = e -> e
1281 e -> {:error, e}
1282 end
1283 end
1284
1285 defp normalize_counter(counter) when is_integer(counter), do: counter
1286 defp normalize_counter(_), do: 0
1287
1288 def maybe_update_follow_information(user_data) do
1289 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1290 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1291 {_, true} <-
1292 {:collections_available,
1293 !!(user_data[:following_address] && user_data[:follower_address])},
1294 {:ok, info} <-
1295 fetch_follow_information_for_user(user_data) do
1296 info = Map.merge(user_data[:info] || %{}, info)
1297
1298 user_data
1299 |> Map.put(:info, info)
1300 else
1301 {:user_type_check, false} ->
1302 user_data
1303
1304 {:collections_available, false} ->
1305 user_data
1306
1307 {:enabled, false} ->
1308 user_data
1309
1310 e ->
1311 Logger.error(
1312 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1313 )
1314
1315 user_data
1316 end
1317 end
1318
1319 defp collection_private(%{"first" => %{"type" => type}})
1320 when type in ["CollectionPage", "OrderedCollectionPage"],
1321 do: {:ok, false}
1322
1323 defp collection_private(%{"first" => first}) do
1324 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1325 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1326 {:ok, false}
1327 else
1328 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1329 {:error, _} = e -> e
1330 e -> {:error, e}
1331 end
1332 end
1333
1334 defp collection_private(_data), do: {:ok, true}
1335
1336 def user_data_from_user_object(data) do
1337 with {:ok, data} <- MRF.filter(data) do
1338 {:ok, object_to_user_data(data)}
1339 else
1340 e -> {:error, e}
1341 end
1342 end
1343
1344 def fetch_and_prepare_user_from_ap_id(ap_id) do
1345 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1346 {:ok, data} <- user_data_from_user_object(data) do
1347 {:ok, maybe_update_follow_information(data)}
1348 else
1349 {:error, "Object has been deleted" = e} ->
1350 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1351 {:error, e}
1352
1353 {:error, {:reject, reason} = e} ->
1354 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1355 {:error, e}
1356
1357 {:error, e} ->
1358 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1359 {:error, e}
1360 end
1361 end
1362
1363 def maybe_handle_clashing_nickname(data) do
1364 with nickname when is_binary(nickname) <- data[:nickname],
1365 %User{} = old_user <- User.get_by_nickname(nickname),
1366 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1367 Logger.info(
1368 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
1369 data[:ap_id]
1370 }, renaming."
1371 )
1372
1373 old_user
1374 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1375 |> User.update_and_set_cache()
1376 else
1377 {:ap_id_comparison, true} ->
1378 Logger.info(
1379 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1380 )
1381
1382 _ ->
1383 nil
1384 end
1385 end
1386
1387 def make_user_from_ap_id(ap_id) do
1388 user = User.get_cached_by_ap_id(ap_id)
1389
1390 if user && !User.ap_enabled?(user) do
1391 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1392 else
1393 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1394 if user do
1395 user
1396 |> User.remote_user_changeset(data)
1397 |> User.update_and_set_cache()
1398 else
1399 maybe_handle_clashing_nickname(data)
1400
1401 data
1402 |> User.remote_user_changeset()
1403 |> Repo.insert()
1404 |> User.set_cache()
1405 end
1406 end
1407 end
1408 end
1409
1410 def make_user_from_nickname(nickname) do
1411 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1412 make_user_from_ap_id(ap_id)
1413 else
1414 _e -> {:error, "No AP id in WebFinger"}
1415 end
1416 end
1417
1418 # filter out broken threads
1419 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1420 entire_thread_visible_for_user?(activity, user)
1421 end
1422
1423 # do post-processing on a specific activity
1424 def contain_activity(%Activity{} = activity, %User{} = user) do
1425 contain_broken_threads(activity, user)
1426 end
1427
1428 def fetch_direct_messages_query do
1429 Activity
1430 |> restrict_type(%{type: "Create"})
1431 |> restrict_visibility(%{visibility: "direct"})
1432 |> order_by([activity], asc: activity.id)
1433 end
1434 end